darenwkt commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1292536844

   
   
   
   > > Hi @morhidi,
   > > I have updated the stackTrace to be a string and added a ConfigOption to 
limit the length of the stackTrace string.
   > > Regarding the concern on deserializing the error status into a valid 
json field, I have tested that deserialization back into the 
FlinkResourceException class works. My testing was done as follows:
   > > 
   > > 1. `kubectl get flinkdeployment basic-example -o yaml > test.yaml`
   > > 2. Tested deserializing the `test.yaml` back into FlinkResourceException 
class using the following code:
   > > 
   > > ```
   > >     @Test
   > >     public void testYamlDeserialization() throws IOException {
   > > 
   > >         Yaml yaml = new Yaml();
   > >         InputStream inputStream = this.getClass()
   > >                 .getClassLoader()
   > >                 .getResourceAsStream("test.yaml");
   > >         Map<String, Object> obj = yaml.load(inputStream);
   > >         System.out.println("deserialized yaml: " + obj);
   > > 
   > >         ObjectMapper mapper = new ObjectMapper();
   > >         FlinkResourceException ex = mapper.readValue((String) 
((Map<String, Object>) obj.get("status")).get("error"), 
FlinkResourceException.class);
   > >         System.out.println("deserialized json: " + ex);
   > >     }
   > > ```
   > > 
   > > 
   > >     
   > >       
   > >     
   > > 
   > >       
   > >     
   > > 
   > >     
   > >   
   > > 
   > > 3. Results of System.out.println are:
   > > 
   > > ```
   > > deserialized yaml: {apiVersion=flink.apache.org/v1beta1, 
kind=FlinkDeployment, 
metadata={annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"}}}}
   > > ```
   > > 
   > > 
   > >     
   > >       
   > >     
   > > 
   > >       
   > >     
   > > 
   > >     
   > >   
   > > ```
   > > deserialized json: 
FlinkResourceException(type=org.apache.flink.kubernetes.operator.exception.ReconciliationException,
 message=org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not create Kubernetes cluster "basic-example"., stackTraceElements=null, 
additionalMetadata=null, 
throwableList=[FlinkResourceException(type=org.apache.flink.client.deployment.ClusterDeploymentException,
 message=Could not create Kubernetes cluster "basic-example"., 
stackTraceElements=null, additionalMetadata=null, throwableList=null), 
FlinkResourceException(type=org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException,
 message=Failure executing: POST at: 
https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: 
Deployment.apps "basic-example" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
"flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: 
"downloads"]. Received status: Status(ap
 iVersion=v1, code=422, 
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
 message=Not found: "flink-logs", reason=FieldValueNotFound, 
additionalProperties={}), 
StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, 
message=Not found: "downloads", reason=FieldValueNotFound, 
additionalProperties={})], group=apps, kind=Deployment, name=basic-example, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=Deployment.apps "basic-example" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
"flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: 
"downloads"], metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties={})., stackTraceElements=null, 
additionalMetadata=null, throwableList=null)])
   > > ```
   > > 
   > > 
   > >     
   > >       
   > >     
   > > 
   > >       
   > >     
   > > 
   > >     
   > >   
   > > As a result, I can confirm deserialization of the json works. The 
question now is whether we are ok with the current format the error field is 
shown in CR yaml, which includes the escape field. I tried to search this up 
and the cause of this is we are storing the string with single quote '' instead 
of double quote "" in the yaml. Referring to 
https://www.baeldung.com/yaml-multi-line#quoting. I have tried to looking at 
openAPIV3Schema, but I don't see a straightforward way to change it to a double 
quote. I also suspect this could be related to how K8 api serializes the CR 
into yaml.
   > 
   > @darenwkt did you manage to properly parse the output of `kubectl get 
flinkdeployment basic-example -o yaml > test.yaml` back into a flinkdeployment 
CR too? It looks the JSON breaks up the yaml structure, so I'm worried more 
about the whole CR parsing not just the JSON within. :)
   > 
   > On my cluster the error/stacktrace appears to be a multiline nested yaml:
   > 
   > <img alt="image" width="1714" 
src="https://user-images.githubusercontent.com/53612764/198031836-86743a90-66ed-4f70-ad8e-981f99550e5d.png";>
   > 
   > Which is not a big deal if we can parse the CR back properly without 
loosing any content.
   
   Hi @morhidi,
   
   No problem, I have tested the deserialization of `test.yaml` into 
`FlinkDeployment.class` and can confirm that it's able to deserialize 
correctly. My testing for this is as follows:
   
   ```
           mapper = new ObjectMapper(new YAMLFactory());
           FlinkDeployment flinkDeployment = mapper.readValue(new 
File("src/test/resources/test.yaml"), FlinkDeployment.class);
           System.out.println("deserialized flinkDeployment: " + 
flinkDeployment);
           System.out.println("deserialized flinkDeployment error: " + 
flinkDeployment.getStatus().getError());
   ```
   
   The results are:
   ```
   deserialized flinkDeployment: CustomResource{kind='FlinkDeployment', 
apiVersion='flink.apache.org/v1beta1', 
metadata=ObjectMeta(annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"}}}}
   }, clusterName=null, creationTimestamp=2022-10-24T21:50:52Z, 
deletionGracePeriodSeconds=null, deletionTimestamp=null, 
finalizers=[flinkdeployments.flink.apache.org/finalizer], generateName=null, 
generation=2, labels=null, managedFields=[], name=basic-example, 
namespace=default, ownerReferences=[], resourceVersion=3127914, selfLink=null, 
uid=200aec43-2bf7-44a7-a020-188ea091ff9f, additionalProperties={}), 
spec=FlinkDeploymentSpec(super=AbstractFlinkSpec(job=JobSpec(jarURI=local:///opt/flink/examples/streaming/StateMachineExample.jar,
 parallelism=2, entryClass=null, args=[], state=RUNNING, 
savepointTriggerNonce=null, initialSavepointPath=null, upgradeMode=STATELESS, 
allowNonRestoredState=null), restartNonce=null, 
flinkConfiguration={taskmanager.numberOfTaskSlots=2}), image=flink:1.15, 
imagePullPolicy=null, serviceAccount=flink, flinkVersion=v1_15, ingress=null, 
podTemplate=Pod(apiVersion=v1, kind=Pod, metadata=ObjectMeta(annotations=null, 
clusterName=null, creationTimestamp=null, del
 etionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[], 
generateName=null, generation=null, labels=null, managedFields=[], 
name=pod-template, namespace=null, ownerReferences=[], resourceVersion=null, 
selfLink=null, uid=null, additionalProperties={}), 
spec=PodSpec(activeDeadlineSeconds=null, affinity=null, 
automountServiceAccountToken=null, containers=[Container(args=[], command=[], 
env=[], envFrom=[], image=null, imagePullPolicy=null, lifecycle=null, 
livenessProbe=null, name=flink-main-container, ports=[], readinessProbe=null, 
resources=null, securityContext=null, startupProbe=null, stdin=null, 
stdinOnce=null, terminationMessagePath=null, terminationMessagePolicy=null, 
tty=null, volumeDevices=[], volumeMounts=[VolumeMount(mountPath=/opt/flink/log, 
mountPropagation=null, name=flink-logs, readOnly=null, subPath=null, 
subPathExpr=null, additionalProperties={}), 
VolumeMount(mountPath=/opt/flink/downloads, mountPropagation=null, 
name=downloads, readOnly=null, subPath=null, s
 ubPathExpr=null, additionalProperties={})], workingDir=null, 
additionalProperties={})], dnsConfig=null, dnsPolicy=null, 
enableServiceLinks=null, ephemeralContainers=[], hostAliases=[], hostIPC=null, 
hostNetwork=null, hostPID=null, hostname=null, imagePullSecrets=[], 
initContainers=[], nodeName=null, nodeSelector=null, os=null, overhead=null, 
preemptionPolicy=null, priority=null, priorityClassName=null, 
readinessGates=[], restartPolicy=null, runtimeClassName=null, 
schedulerName=null, securityContext=null, serviceAccount=null, 
serviceAccountName=null, setHostnameAsFQDN=null, shareProcessNamespace=null, 
subdomain=null, terminationGracePeriodSeconds=null, tolerations=[], 
topologySpreadConstraints=[], volumes=[], additionalProperties={}), 
status=null, additionalProperties={}), 
jobManager=JobManagerSpec(resource=Resource(cpu=1.0, memory=2048m), replicas=1, 
podTemplate=null), taskManager=TaskManagerSpec(resource=Resource(cpu=1.0, 
memory=2048m), replicas=null, podTemplate=null), logConfigur
 ation=null, mode=null), 
status=FlinkDeploymentStatus(super=CommonStatus(jobStatus=JobStatus(jobName=null,
 jobId=16ab0d806af3276492caac21c6294d49, state=null, startTime=null, 
updateTime=null, savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=, 
triggerTimestamp=0, triggerType=UNKNOWN, formatType=UNKNOWN, 
savepointHistory=[], lastPeriodicSavepointTimestamp=0)), 
error={"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
 Could not create Kubernetes cluster 
\"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
 not create Kubernetes cluster 
\"basic-example\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: POST at: 
https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: 
Deployment.apps \"basic-example\" is invali
 d: [spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
\"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not 
found: \"downloads\"]. Received status: Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
 message=Not found: \"flink-logs\", reason=FieldValueNotFound, 
additionalProperties={}), 
StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, 
message=Not found: \"downloads\", reason=FieldValueNotFound, 
additionalProperties={})], group=apps, kind=Deployment, name=basic-example, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=Deployment.apps \"basic-example\" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
\"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not 
found: \"downloads\"], metadata=ListMeta(_continue=null, 
remainingItemCount=null, resourceVersion=null, selfLi
 nk=null, additionalProperties={}), reason=Invalid, status=Failure, 
additionalProperties={})."}]}), clusterInfo={}, 
jobManagerDeploymentStatus=MISSING, 
reconciliationStatus=FlinkDeploymentReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1666650817435,
 
lastReconciledSpec={"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"upgradeMode":"stateless","allowNonRestoredState":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloa
 
ds"}]}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}},
 lastStableSpec=null, state=UPGRADING)), 
taskManager=TaskManagerInfo(labelSelector=, replicas=0))}
   ```
   ```
   deserialized flinkDeployment error: 
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
 Could not create Kubernetes cluster 
\"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
 not create Kubernetes cluster 
\"basic-example\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: POST at: 
https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: 
Deployment.apps \"basic-example\" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
\"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not 
found: \"downloads\"]. Received status: Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
 message=Not found
 : \"flink-logs\", reason=FieldValueNotFound, additionalProperties={}), 
StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, 
message=Not found: \"downloads\", reason=FieldValueNotFound, 
additionalProperties={})], group=apps, kind=Deployment, name=basic-example, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=Deployment.apps \"basic-example\" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
\"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not 
found: \"downloads\"], metadata=ListMeta(_continue=null, 
remainingItemCount=null, resourceVersion=null, selfLink=null, 
additionalProperties={}), reason=Invalid, status=Failure, 
additionalProperties={})."}]}
   
   ```
   
   I think having the multiline yaml helps with readability of the error field 
as long as it's able to deserialize correctly as what you have mentioned.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to