gyfora commented on code in PR #269:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901132589


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusHelper.java:
##########
@@ -122,4 +144,45 @@ public <T extends CustomResource<?, STATUS>> void 
removeCachedStatus(T resource)
     protected static Tuple2<String, String> getKey(HasMetadata resource) {
         return Tuple2.of(resource.getMetadata().getNamespace(), 
resource.getMetadata().getName());
     }
+
+    public static <S extends CommonStatus<?>> StatusHelper<S> create(
+            KubernetesClient kubernetesClient, 
Collection<FlinkResourceListener> listeners) {
+        BiConsumer<AbstractFlinkResource<?, S>, S> consumer =
+                (resource, previousStatus) -> {
+                    var ctx =
+                            new FlinkResourceListener.StatusUpdateContext() {
+                                @Override
+                                public S getPreviousStatus() {
+                                    return previousStatus;
+                                }
+
+                                @Override
+                                public AbstractFlinkResource<?, S> 
getFlinkResource() {
+                                    return resource;
+                                }
+
+                                @Override
+                                public KubernetesClient getKubernetesClient() {
+                                    return kubernetesClient;
+                                }
+                            };
+
+                    listeners.forEach(
+                            listener -> {
+                                try {
+                                    if (resource instanceof FlinkDeployment) {
+                                        listener.onDeploymentStatusUpdate(ctx);
+                                    } else {
+                                        listener.onSessionJobStatusUpdate(ctx);
+                                    }
+                                } catch (Exception e) {
+                                    LOG.error(

Review Comment:
   You are probably right, it might be a key integration point, propagating the 
error makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to