darenwkt commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1291703643

   Hi @morhidi,
   
   I have updated the stackTrace to be a string and added a ConfigOption to 
limit the length of the stackTrace string.
   
   Regarding the concern on deserializing the error status into a valid json 
field, I have tested that deserialization back into the FlinkResourceException 
class works. My testing was done as follows:
   
   1. `kubectl get flinkdeployment basic-example -o yaml > test.yaml`
   2. Tested deserializing the `test.yaml` back into FlinkResourceException 
class using the following code:
   
   ```
       @Test
       public void testYamlDeserialization() throws IOException {
   
           Yaml yaml = new Yaml();
           InputStream inputStream = this.getClass()
                   .getClassLoader()
                   .getResourceAsStream("test.yaml");
           Map<String, Object> obj = yaml.load(inputStream);
           System.out.println("deserialized yaml: " + obj);
   
           ObjectMapper mapper = new ObjectMapper();
           FlinkResourceException ex = mapper.readValue((String) ((Map<String, 
Object>) obj.get("status")).get("error"), FlinkResourceException.class);
           System.out.println("deserialized json: " + ex);
       }
   ```
   3. Results of System.out.println are:
   ```
   deserialized yaml: {apiVersion=flink.apache.org/v1beta1, 
kind=FlinkDeployment, 
metadata={annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"}}}}
   ```
   ```
   deserialized json: 
FlinkResourceException(type=org.apache.flink.kubernetes.operator.exception.ReconciliationException,
 message=org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not create Kubernetes cluster "basic-example"., stackTraceElements=null, 
additionalMetadata=null, 
throwableList=[FlinkResourceException(type=org.apache.flink.client.deployment.ClusterDeploymentException,
 message=Could not create Kubernetes cluster "basic-example"., 
stackTraceElements=null, additionalMetadata=null, throwableList=null), 
FlinkResourceException(type=org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException,
 message=Failure executing: POST at: 
https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: 
Deployment.apps "basic-example" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
"flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: 
"downloads"]. Received status: Status(apiVer
 sion=v1, code=422, 
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
 message=Not found: "flink-logs", reason=FieldValueNotFound, 
additionalProperties={}), 
StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, 
message=Not found: "downloads", reason=FieldValueNotFound, 
additionalProperties={})], group=apps, kind=Deployment, name=basic-example, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=Deployment.apps "basic-example" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
"flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: 
"downloads"], metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties={})., stackTraceElements=null, 
additionalMetadata=null, throwableList=null)])
   
   ```
   
   As a result, I can confirm deserialization of the json works. The question 
now is whether we are ok with the current format the error field is shown in CR 
yaml, which includes the escape field. I tried to search this up and the cause 
of this is we are storing the string with single quote '' instead of double 
quote "" in the yaml. Referring to 
https://www.baeldung.com/yaml-multi-line#quoting. I have tried to looking at 
openAPIV3Schema, but I don't see a straightforward way to change it to a double 
quote. I also suspect this could be related to how K8 api serializes the CR 
into yaml.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to