wangyang0918 commented on a change in pull request #13:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/13#discussion_r812636751
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -178,7 +150,14 @@ private void updateForReconciliationError(FlinkDeployment
flinkApp, String err)
@Override
public Optional<FlinkDeployment> updateErrorStatus(
FlinkDeployment flinkApp, RetryInfo retryInfo, RuntimeException e)
{
- LOG.warn("TODO: handle error status");
- return Optional.empty();
+ LOG.warn(
+ "attempt count: {}, last attempt: {}",
+ retryInfo.getAttemptCount(),
+ retryInfo.isLastAttempt());
+
+ updateForReconciliationError(
+ flinkApp,
+ (e instanceof ReconciliationException) ?
e.getCause().toString() : e.toString());
Review comment:
Do we need to `getMessage()` here?
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -94,51 +94,23 @@ public DeleteControl cleanup(FlinkDeployment flinkApp,
Context context) {
@Override
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp,
Context context) {
LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
- if (flinkApp.getStatus() == null) {
- flinkApp.setStatus(new FlinkDeploymentStatus());
- }
Configuration effectiveConfig =
FlinkUtils.getEffectiveConfig(flinkApp);
-
- boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp,
effectiveConfig);
-
- if (!successfulObserve) {
- // Cluster not accessible let's retry
- return UpdateControl.<FlinkDeployment>noUpdate()
- .rescheduleAfter(OBSERVE_REFRESH_SECONDS,
TimeUnit.SECONDS);
- }
-
- if (!specChanged(flinkApp)) {
- // Successfully observed the cluster after reconciliation, no need
to reschedule
- return UpdateControl.updateStatus(flinkApp);
- }
-
try {
- reconcileFlinkDeployment(operatorNamespace, flinkApp,
effectiveConfig);
- } catch (Exception e) {
- String err = "Error while reconciling deployment change: " +
e.getMessage();
- String lastErr =
flinkApp.getStatus().getReconciliationStatus().getError();
- if (!err.equals(lastErr)) {
- // Log new errors on the first instance
- LOG.error("Error while reconciling deployment change", e);
- updateForReconciliationError(flinkApp, err);
- return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS,
TimeUnit.SECONDS);
- } else {
- return UpdateControl.<FlinkDeployment>noUpdate()
- .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS,
TimeUnit.SECONDS);
+ boolean successfulObserve =
observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
Review comment:
Maybe this is out the scope of this PR. I am afraid the root cause will
be hidden since we will always get a `java.util.concurrent.TimeoutException`
when Flink client failed to deploy application.
Will this issue be resolved in FLINK-26261? cc @tweise
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]