This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s-shutdown
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 9a5b50833a69322373baa7b5b1ab278519aea756
Author: benjobs <[email protected]>
AuthorDate: Mon Jan 8 23:05:54 2024 +0800

    [Imprpve] k8s cluster shutdown bug fixed.
---
 .../core/service/impl/FlinkClusterServiceImpl.java | 89 ++++++++--------------
 .../streampark/flink/client/FlinkClient.scala      |  4 +-
 .../flink/client/bean/DeployRequest.scala          | 47 +++++++++---
 .../flink/client/bean/ShutDownRequest.scala        | 32 --------
 .../flink/client/FlinkClientHandler.scala          |  2 +-
 .../impl/KubernetesNativeSessionClient.scala       | 59 ++++++--------
 .../flink/client/impl/YarnSessionClient.scala      |  2 +-
 7 files changed, 97 insertions(+), 138 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index fde4c00e6..f73a58290 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -35,8 +35,7 @@ import 
org.apache.streampark.console.core.service.YarnQueueService;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.DeployRequest;
 import org.apache.streampark.flink.client.bean.DeployResponse;
-import org.apache.streampark.flink.client.bean.KubernetesDeployParam;
-import org.apache.streampark.flink.client.bean.ShutDownRequest;
+import org.apache.streampark.flink.client.bean.KubernetesDeployRequest;
 import org.apache.streampark.flink.client.bean.ShutDownResponse;
 
 import org.apache.commons.lang3.StringUtils;
@@ -153,33 +152,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     FlinkCluster flinkCluster = getById(cluster.getId());
     try {
       ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
-      KubernetesDeployParam kubernetesDeployParam = null;
-      switch (executionModeEnum) {
-        case YARN_SESSION:
-          break;
-        case KUBERNETES_NATIVE_SESSION:
-          kubernetesDeployParam =
-              new KubernetesDeployParam(
-                  flinkCluster.getClusterId(),
-                  flinkCluster.getK8sNamespace(),
-                  flinkCluster.getK8sConf(),
-                  flinkCluster.getServiceAccount(),
-                  flinkCluster.getFlinkImage(),
-                  flinkCluster.getK8sRestExposedTypeEnum());
-          break;
-        default:
-          throw new ApiAlertException(
-              "the ExecutionModeEnum " + executionModeEnum.getName() + "can't 
start!");
-      }
-      FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
-      DeployRequest deployRequest =
-          new DeployRequest(
-              flinkEnv.getFlinkVersion(),
-              executionModeEnum,
-              flinkCluster.getProperties(),
-              flinkCluster.getClusterId(),
-              kubernetesDeployParam);
-      log.info("deploy cluster request " + deployRequest);
+      DeployRequest deployRequest = getDeployRequest(flinkCluster);
+      log.info("deploy cluster request: " + deployRequest);
       Future<DeployResponse> future =
           executorService.submit(() -> FlinkClient.deploy(deployRequest));
       DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
@@ -208,6 +182,33 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
   }
 
+  private DeployRequest getDeployRequest(FlinkCluster flinkCluster) {
+    ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
+    FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
+    switch (executionModeEnum) {
+      case YARN_SESSION:
+        return DeployRequest.apply(
+            flinkEnv.getFlinkVersion(),
+            executionModeEnum,
+            flinkCluster.getProperties(),
+            flinkCluster.getClusterId());
+      case KUBERNETES_NATIVE_SESSION:
+        return KubernetesDeployRequest.apply(
+            flinkEnv.getFlinkVersion(),
+            executionModeEnum,
+            flinkCluster.getProperties(),
+            flinkCluster.getClusterId(),
+            flinkCluster.getK8sNamespace(),
+            flinkCluster.getK8sConf(),
+            flinkCluster.getServiceAccount(),
+            flinkCluster.getFlinkImage(),
+            flinkCluster.getK8sRestExposedTypeEnum());
+      default:
+        throw new ApiAlertException(
+            "the ExecutionModeEnum " + executionModeEnum.getName() + "can't 
start!");
+    }
+  }
+
   @Override
   public void update(FlinkCluster cluster) {
     FlinkCluster flinkCluster = getById(cluster.getId());
@@ -247,24 +248,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     // 1) check mode
     ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
     String clusterId = flinkCluster.getClusterId();
-    KubernetesDeployParam kubernetesDeployParam = null;
-    switch (executionModeEnum) {
-      case YARN_SESSION:
-        break;
-      case KUBERNETES_NATIVE_SESSION:
-        kubernetesDeployParam =
-            new KubernetesDeployParam(
-                flinkCluster.getClusterId(),
-                flinkCluster.getK8sNamespace(),
-                flinkCluster.getK8sConf(),
-                flinkCluster.getServiceAccount(),
-                flinkCluster.getFlinkImage(),
-                flinkCluster.getK8sRestExposedTypeEnum());
-        break;
-      default:
-        throw new ApiAlertException(
-            "the ExecutionModeEnum " + executionModeEnum.getName() + "can't 
shutdown!");
-    }
     if (StringUtils.isBlank(clusterId)) {
       throw new ApiAlertException("the clusterId can not be empty!");
     }
@@ -291,18 +274,10 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
 
     // 4) shutdown
-    FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
-    ShutDownRequest stopRequest =
-        new ShutDownRequest(
-            flinkEnv.getFlinkVersion(),
-            executionModeEnum,
-            flinkCluster.getProperties(),
-            clusterId,
-            kubernetesDeployParam);
-
+    DeployRequest deployRequest = getDeployRequest(cluster);
     try {
       Future<ShutDownResponse> future =
-          executorService.submit(() -> FlinkClient.shutdown(stopRequest));
+          executorService.submit(() -> FlinkClient.shutdown(deployRequest));
       ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
       if (shutDownResponse != null) {
         flinkCluster.setAddress(null);
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
index c2ea9ba48..19ee9e744 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
@@ -57,8 +57,8 @@ object FlinkClient extends Logger {
     proxy[DeployResponse](deployRequest, deployRequest.flinkVersion, 
DEPLOY_REQUEST)
   }
 
-  def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
-    proxy[ShutDownResponse](shutDownRequest, shutDownRequest.flinkVersion, 
SHUTDOWN_REQUEST)
+  def shutdown(deployRequest: DeployRequest): ShutDownResponse = {
+    proxy[ShutDownResponse](deployRequest, deployRequest.flinkVersion, 
SHUTDOWN_REQUEST)
   }
 
   def triggerSavepoint(savepointRequest: TriggerSavepointRequest): 
SavepointResponse = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
index db7bbcd3d..81b36a17b 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
@@ -24,8 +24,6 @@ import org.apache.streampark.flink.util.FlinkUtils
 import org.apache.commons.io.FileUtils
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 
-import javax.annotation.Nullable
-
 import java.io.File
 import java.util.{Map => JavaMap}
 
@@ -33,8 +31,7 @@ case class DeployRequest(
     flinkVersion: FlinkVersion,
     executionMode: ExecutionMode,
     properties: JavaMap[String, Any],
-    clusterId: String,
-    @Nullable k8sDeployParam: KubernetesDeployParam) {
+    clusterId: String) {
 
   private[client] lazy val hdfsWorkspace = {
 
@@ -63,10 +60,38 @@ case class DeployRequest(
   }
 }
 
-case class KubernetesDeployParam(
-    clusterId: String,
-    kubernetesNamespace: String = 
KubernetesConfigOptions.NAMESPACE.defaultValue(),
-    kubeConf: String = "~/.kube/config",
-    serviceAccount: String = 
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(),
-    flinkImage: String = 
KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
-    @Nullable flinkRestExposedType: FlinkK8sRestExposedType = 
FlinkK8sRestExposedType.CLUSTER_IP)
+class KubernetesDeployRequest(
+    override val flinkVersion: FlinkVersion,
+    override val executionMode: ExecutionMode,
+    override val properties: JavaMap[String, Any],
+    override val clusterId: String,
+    val kubernetesNamespace: String = 
KubernetesConfigOptions.NAMESPACE.defaultValue(),
+    val kubeConf: String = "~/.kube/config",
+    val serviceAccount: String = 
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(),
+    val flinkImage: String = 
KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
+    val flinkRestExposedType: FlinkK8sRestExposedType = 
FlinkK8sRestExposedType.CLUSTER_IP)
+  extends DeployRequest(flinkVersion, executionMode, properties, clusterId)
+
+object KubernetesDeployRequest {
+  def apply(
+      flinkVersion: FlinkVersion,
+      executionMode: ExecutionMode,
+      properties: JavaMap[String, Any],
+      clusterId: String,
+      kubernetesNamespace: String,
+      kubeConf: String,
+      serviceAccount: String,
+      flinkImage: String,
+      flinkRestExposedType: FlinkK8sRestExposedType): KubernetesDeployRequest 
= {
+    new KubernetesDeployRequest(
+      flinkVersion,
+      executionMode,
+      properties,
+      clusterId,
+      kubernetesNamespace,
+      kubeConf,
+      serviceAccount,
+      flinkImage,
+      flinkRestExposedType)
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala
deleted file mode 100644
index a6b75bbdf..000000000
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.client.bean
-
-import org.apache.streampark.common.conf.FlinkVersion
-import org.apache.streampark.common.enums.ExecutionMode
-
-import javax.annotation.Nullable
-
-import java.util.{Map => JavaMap}
-
-case class ShutDownRequest(
-    flinkVersion: FlinkVersion,
-    executionMode: ExecutionMode,
-    @Nullable properties: JavaMap[String, Any],
-    clusterId: String,
-    @Nullable kubernetesDeployParam: KubernetesDeployParam)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
index e85ceb6ee..c6fa402e9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
@@ -70,7 +70,7 @@ object FlinkClientHandler {
     }
   }
 
-  def shutdown(request: ShutDownRequest): ShutDownResponse = {
+  def shutdown(request: DeployRequest): ShutDownResponse = {
     request.executionMode match {
       case YARN_SESSION => YarnSessionClient.shutdown(request)
       case KUBERNETES_NATIVE_SESSION => 
KubernetesNativeSessionClient.shutdown(request)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 621b7ac8a..3f2e99bde 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -25,6 +25,7 @@ import 
org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
 import org.apache.streampark.flink.core.FlinkKubernetesClient
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
 
 import io.fabric8.kubernetes.api.model.{Config => _}
@@ -134,7 +135,8 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     super.doCancel(cancelRequest, flinkConfig)
   }
 
-  def deploy(deployRequest: DeployRequest): DeployResponse = {
+  def deploy(deployReq: DeployRequest): DeployResponse = {
+    val deployRequest = deployReq.asInstanceOf[KubernetesDeployRequest]
     logInfo(
       s"""
          |--------------------------------------- kubernetes session start 
---------------------------------------
@@ -142,10 +144,10 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
          |    flinkVersion     : ${deployRequest.flinkVersion.version}
          |    execMode         : ${deployRequest.executionMode.name()}
          |    clusterId        : ${deployRequest.clusterId}
-         |    namespace        : 
${deployRequest.k8sDeployParam.kubernetesNamespace}
-         |    exposedType      : 
${deployRequest.k8sDeployParam.flinkRestExposedType}
-         |    serviceAccount   : ${deployRequest.k8sDeployParam.serviceAccount}
-         |    flinkImage       : ${deployRequest.k8sDeployParam.flinkImage}
+         |    namespace        : ${deployRequest.kubernetesNamespace}
+         |    exposedType      : ${deployRequest.flinkRestExposedType}
+         |    serviceAccount   : ${deployRequest.serviceAccount}
+         |    flinkImage       : ${deployRequest.flinkImage}
          |    properties       : ${deployRequest.properties.mkString(" ")}
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)
@@ -157,20 +159,16 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
         extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
       flinkConfig
         .safeSet(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName)
-        .safeSet(
-          KubernetesConfigOptions.NAMESPACE,
-          deployRequest.k8sDeployParam.kubernetesNamespace)
-        .safeSet(
-          KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
-          deployRequest.k8sDeployParam.serviceAccount)
+        .safeSet(KubernetesConfigOptions.NAMESPACE, 
deployRequest.kubernetesNamespace)
+        .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, 
deployRequest.serviceAccount)
         .safeSet(
           KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-          
ServiceExposedType.valueOf(deployRequest.k8sDeployParam.flinkRestExposedType.getName))
+          
ServiceExposedType.valueOf(deployRequest.flinkRestExposedType.getName))
         .safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId)
-        .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
deployRequest.k8sDeployParam.flinkImage)
+        .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
deployRequest.flinkImage)
         .safeSet(
           KubernetesConfigOptions.KUBE_CONFIG_FILE,
-          getDefaultKubernetesConf(deployRequest.k8sDeployParam.kubeConf))
+          getDefaultKubernetesConf(deployRequest.kubeConf))
         .safeSet(
           DeploymentOptionsInternal.CONF_DIR,
           s"${deployRequest.flinkVersion.flinkHome}/conf")
@@ -205,7 +203,8 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     }
   }
 
-  def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+  def shutdown(deployRequest: DeployRequest): ShutDownResponse = {
+    val shutDownRequest = deployRequest.asInstanceOf[KubernetesDeployRequest]
     var kubeClient: FlinkKubeClient = null
     try {
       val flinkConfig = 
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
@@ -217,32 +216,24 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
           })
       flinkConfig
         .safeSet(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName)
-        .safeSet(
-          KubernetesConfigOptions.NAMESPACE,
-          shutDownRequest.kubernetesDeployParam.kubernetesNamespace)
-        .safeSet(
-          KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
-          shutDownRequest.kubernetesDeployParam.serviceAccount)
+        .safeSet(KubernetesConfigOptions.NAMESPACE, 
shutDownRequest.kubernetesNamespace)
+        .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, 
shutDownRequest.serviceAccount)
         .safeSet(
           KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-          ServiceExposedType.valueOf(
-            
shutDownRequest.kubernetesDeployParam.flinkRestExposedType.getName))
+          
ServiceExposedType.valueOf(shutDownRequest.flinkRestExposedType.getName))
         .safeSet(KubernetesConfigOptions.CLUSTER_ID, shutDownRequest.clusterId)
-        .safeSet(
-          KubernetesConfigOptions.CONTAINER_IMAGE,
-          shutDownRequest.kubernetesDeployParam.flinkImage)
+        .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
shutDownRequest.flinkImage)
         .safeSet(
           KubernetesConfigOptions.KUBE_CONFIG_FILE,
-          
getDefaultKubernetesConf(shutDownRequest.kubernetesDeployParam.kubeConf))
+          getDefaultKubernetesConf(shutDownRequest.kubeConf))
       kubeClient = 
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
-      val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
-
-      if (
-        shutDownRequest.clusterId != null && kubeClientWrapper
-          .getService(shutDownRequest.clusterId)
-          .isPresent
-      ) {
+      val flinkKubeClient = new FlinkKubernetesClient(kubeClient)
+      val k8sService = flinkKubeClient.getService(shutDownRequest.clusterId)
+      if (k8sService.isPresent) {
         kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
+        KubernetesDeploymentHelper.delete(
+          shutDownRequest.kubernetesNamespace,
+          shutDownRequest.clusterId)
         ShutDownResponse()
       } else {
         null
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index d2a98b740..91eb1d92d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -245,7 +245,7 @@ object YarnSessionClient extends YarnClientTrait {
     }
   }
 
-  def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+  def shutdown(shutDownRequest: DeployRequest): ShutDownResponse = {
     var clusterDescriptor: YarnClusterDescriptor = null
     var client: ClusterClient[ApplicationId] = null
     try {

Reply via email to