darenwkt commented on PR #409:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1292536844
> > Hi @morhidi,
> > I have updated the stackTrace to be a string and added a ConfigOption to
limit the length of the stackTrace string.
> > Regarding the concern on deserializing the error status into a valid
json field, I have tested that deserialization back into the
FlinkResourceException class works. My testing was done as follows:
> >
> > 1. `kubectl get flinkdeployment basic-example -o yaml > test.yaml`
> > 2. Tested deserializing the `test.yaml` back into FlinkResourceException
class using the following code:
> >
> > ```
> > @Test
> > public void testYamlDeserialization() throws IOException {
> >
> > Yaml yaml = new Yaml();
> > InputStream inputStream = this.getClass()
> > .getClassLoader()
> > .getResourceAsStream("test.yaml");
> > Map<String, Object> obj = yaml.load(inputStream);
> > System.out.println("deserialized yaml: " + obj);
> >
> > ObjectMapper mapper = new ObjectMapper();
> > FlinkResourceException ex = mapper.readValue((String)
((Map<String, Object>) obj.get("status")).get("error"),
FlinkResourceException.class);
> > System.out.println("deserialized json: " + ex);
> > }
> > ```
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 3. Results of System.out.println are:
> >
> > ```
> > deserialized yaml: {apiVersion=flink.apache.org/v1beta1,
kind=FlinkDeployment,
metadata={annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"}}}}
> > ```
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > ```
> > deserialized json:
FlinkResourceException(type=org.apache.flink.kubernetes.operator.exception.ReconciliationException,
message=org.apache.flink.client.deployment.ClusterDeploymentException: Could
not create Kubernetes cluster "basic-example"., stackTraceElements=null,
additionalMetadata=null,
throwableList=[FlinkResourceException(type=org.apache.flink.client.deployment.ClusterDeploymentException,
message=Could not create Kubernetes cluster "basic-example".,
stackTraceElements=null, additionalMetadata=null, throwableList=null),
FlinkResourceException(type=org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException,
message=Failure executing: POST at:
https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message:
Deployment.apps "basic-example" is invalid:
[spec.template.spec.containers[0].volumeMounts[0].name: Not found:
"flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found:
"downloads"]. Received status: Status(ap
iVersion=v1, code=422,
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
message=Not found: "flink-logs", reason=FieldValueNotFound,
additionalProperties={}),
StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name,
message=Not found: "downloads", reason=FieldValueNotFound,
additionalProperties={})], group=apps, kind=Deployment, name=basic-example,
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status,
message=Deployment.apps "basic-example" is invalid:
[spec.template.spec.containers[0].volumeMounts[0].name: Not found:
"flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found:
"downloads"], metadata=ListMeta(_continue=null, remainingItemCount=null,
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid,
status=Failure, additionalProperties={})., stackTraceElements=null,
additionalMetadata=null, throwableList=null)])
> > ```
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > As a result, I can confirm deserialization of the json works. The
question now is whether we are ok with the current format the error field is
shown in CR yaml, which includes the escape field. I tried to search this up
and the cause of this is we are storing the string with single quote '' instead
of double quote "" in the yaml. Referring to
https://www.baeldung.com/yaml-multi-line#quoting. I have tried to looking at
openAPIV3Schema, but I don't see a straightforward way to change it to a double
quote. I also suspect this could be related to how K8 api serializes the CR
into yaml.
>
> @darenwkt did you manage to properly parse the output of `kubectl get
flinkdeployment basic-example -o yaml > test.yaml` back into a flinkdeployment
CR too? It looks the JSON breaks up the yaml structure, so I'm worried more
about the whole CR parsing not just the JSON within. :)
>
> On my cluster the error/stacktrace appears to be a multiline nested yaml:
>
> <img alt="image" width="1714"
src="https://user-images.githubusercontent.com/53612764/198031836-86743a90-66ed-4f70-ad8e-981f99550e5d.png">
>
> Which is not a big deal if we can parse the CR back properly without
loosing any content.
Hi @morhidi,
No problem, I have tested the deserialization of `test.yaml` into
`FlinkDeployment.class` and can confirm that it's able to deserialize
correctly. My testing for this is as follows:
```
mapper = new ObjectMapper(new YAMLFactory());
FlinkDeployment flinkDeployment = mapper.readValue(new
File("src/test/resources/test.yaml"), FlinkDeployment.class);
System.out.println("deserialized flinkDeployment: " +
flinkDeployment);
System.out.println("deserialized flinkDeployment error: " +
flinkDeployment.getStatus().getError());
```
The results are:
```
deserialized flinkDeployment: CustomResource{kind='FlinkDeployment',
apiVersion='flink.apache.org/v1beta1',
metadata=ObjectMeta(annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"}}}}
}, clusterName=null, creationTimestamp=2022-10-24T21:50:52Z,
deletionGracePeriodSeconds=null, deletionTimestamp=null,
finalizers=[flinkdeployments.flink.apache.org/finalizer], generateName=null,
generation=2, labels=null, managedFields=[], name=basic-example,
namespace=default, ownerReferences=[], resourceVersion=3127914, selfLink=null,
uid=200aec43-2bf7-44a7-a020-188ea091ff9f, additionalProperties={}),
spec=FlinkDeploymentSpec(super=AbstractFlinkSpec(job=JobSpec(jarURI=local:///opt/flink/examples/streaming/StateMachineExample.jar,
parallelism=2, entryClass=null, args=[], state=RUNNING,
savepointTriggerNonce=null, initialSavepointPath=null, upgradeMode=STATELESS,
allowNonRestoredState=null), restartNonce=null,
flinkConfiguration={taskmanager.numberOfTaskSlots=2}), image=flink:1.15,
imagePullPolicy=null, serviceAccount=flink, flinkVersion=v1_15, ingress=null,
podTemplate=Pod(apiVersion=v1, kind=Pod, metadata=ObjectMeta(annotations=null,
clusterName=null, creationTimestamp=null, del
etionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[],
generateName=null, generation=null, labels=null, managedFields=[],
name=pod-template, namespace=null, ownerReferences=[], resourceVersion=null,
selfLink=null, uid=null, additionalProperties={}),
spec=PodSpec(activeDeadlineSeconds=null, affinity=null,
automountServiceAccountToken=null, containers=[Container(args=[], command=[],
env=[], envFrom=[], image=null, imagePullPolicy=null, lifecycle=null,
livenessProbe=null, name=flink-main-container, ports=[], readinessProbe=null,
resources=null, securityContext=null, startupProbe=null, stdin=null,
stdinOnce=null, terminationMessagePath=null, terminationMessagePolicy=null,
tty=null, volumeDevices=[], volumeMounts=[VolumeMount(mountPath=/opt/flink/log,
mountPropagation=null, name=flink-logs, readOnly=null, subPath=null,
subPathExpr=null, additionalProperties={}),
VolumeMount(mountPath=/opt/flink/downloads, mountPropagation=null,
name=downloads, readOnly=null, subPath=null, s
ubPathExpr=null, additionalProperties={})], workingDir=null,
additionalProperties={})], dnsConfig=null, dnsPolicy=null,
enableServiceLinks=null, ephemeralContainers=[], hostAliases=[], hostIPC=null,
hostNetwork=null, hostPID=null, hostname=null, imagePullSecrets=[],
initContainers=[], nodeName=null, nodeSelector=null, os=null, overhead=null,
preemptionPolicy=null, priority=null, priorityClassName=null,
readinessGates=[], restartPolicy=null, runtimeClassName=null,
schedulerName=null, securityContext=null, serviceAccount=null,
serviceAccountName=null, setHostnameAsFQDN=null, shareProcessNamespace=null,
subdomain=null, terminationGracePeriodSeconds=null, tolerations=[],
topologySpreadConstraints=[], volumes=[], additionalProperties={}),
status=null, additionalProperties={}),
jobManager=JobManagerSpec(resource=Resource(cpu=1.0, memory=2048m), replicas=1,
podTemplate=null), taskManager=TaskManagerSpec(resource=Resource(cpu=1.0,
memory=2048m), replicas=null, podTemplate=null), logConfigur
ation=null, mode=null),
status=FlinkDeploymentStatus(super=CommonStatus(jobStatus=JobStatus(jobName=null,
jobId=16ab0d806af3276492caac21c6294d49, state=null, startTime=null,
updateTime=null, savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=,
triggerTimestamp=0, triggerType=UNKNOWN, formatType=UNKNOWN,
savepointHistory=[], lastPeriodicSavepointTimestamp=0)),
error={"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
Could not create Kubernetes cluster
\"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
not create Kubernetes cluster
\"basic-example\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
executing: POST at:
https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message:
Deployment.apps \"basic-example\" is invali
d: [spec.template.spec.containers[0].volumeMounts[0].name: Not found:
\"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not
found: \"downloads\"]. Received status: Status(apiVersion=v1, code=422,
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
message=Not found: \"flink-logs\", reason=FieldValueNotFound,
additionalProperties={}),
StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name,
message=Not found: \"downloads\", reason=FieldValueNotFound,
additionalProperties={})], group=apps, kind=Deployment, name=basic-example,
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status,
message=Deployment.apps \"basic-example\" is invalid:
[spec.template.spec.containers[0].volumeMounts[0].name: Not found:
\"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not
found: \"downloads\"], metadata=ListMeta(_continue=null,
remainingItemCount=null, resourceVersion=null, selfLi
nk=null, additionalProperties={}), reason=Invalid, status=Failure,
additionalProperties={})."}]}), clusterInfo={},
jobManagerDeploymentStatus=MISSING,
reconciliationStatus=FlinkDeploymentReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1666650817435,
lastReconciledSpec={"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"upgradeMode":"stateless","allowNonRestoredState":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloa
ds"}]}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}},
lastStableSpec=null, state=UPGRADING)),
taskManager=TaskManagerInfo(labelSelector=, replicas=0))}
```
```
deserialized flinkDeployment error:
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
Could not create Kubernetes cluster
\"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
not create Kubernetes cluster
\"basic-example\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
executing: POST at:
https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message:
Deployment.apps \"basic-example\" is invalid:
[spec.template.spec.containers[0].volumeMounts[0].name: Not found:
\"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not
found: \"downloads\"]. Received status: Status(apiVersion=v1, code=422,
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
message=Not found
: \"flink-logs\", reason=FieldValueNotFound, additionalProperties={}),
StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name,
message=Not found: \"downloads\", reason=FieldValueNotFound,
additionalProperties={})], group=apps, kind=Deployment, name=basic-example,
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status,
message=Deployment.apps \"basic-example\" is invalid:
[spec.template.spec.containers[0].volumeMounts[0].name: Not found:
\"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: Not
found: \"downloads\"], metadata=ListMeta(_continue=null,
remainingItemCount=null, resourceVersion=null, selfLink=null,
additionalProperties={}), reason=Invalid, status=Failure,
additionalProperties={})."}]}
```
I think having the multiline yaml helps with readability of the error field
as long as it's able to deserialize correctly as what you have mentioned.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]