This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2d998f181dbeec0a5efb42e2bf94329897e8e150
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Wed Aug 26 22:49:25 2020 +0800

    KYLIN-4695 Automatically start sparder (for query) application when start 
kylin instance
---
 .../kylin/job/execution/AbstractExecutable.java    |  4 +-
 .../org/apache/kylin/job/execution/Idempotent.java |  2 +-
 .../kylin/engine/spark/job/NSparkCubingStep.java   |  9 ++--
 .../NSparkUpdateMetaAndCleanupAfterMergeStep.java  |  8 +--
 .../kylin/rest/init/InitialSparkerContext.java     | 59 ++++++++++++++++++++++
 server/src/main/resources/applicationContext.xml   |  1 +
 6 files changed, 74 insertions(+), 9 deletions(-)

diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 33d42f3..7f6ba2f 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -199,7 +199,7 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
                     logger.error("error running Executable: {}", 
this.toString());
                     catchedException = e;
                 } finally {
-                    cleanup();
+                    cleanup(result);
                 }
                 retry++;
                 realException = catchedException != null ? catchedException
@@ -251,7 +251,7 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
     protected abstract ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException, PersistentException;
 
     @Override
-    public void cleanup() throws ExecuteException {
+    public void cleanup(ExecuteResult result) throws ExecuteException {
 
     }
 
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/Idempotent.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
index 98c950e..49d73a4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
@@ -24,5 +24,5 @@ import org.apache.kylin.job.exception.ExecuteException;
  */
 public interface Idempotent {
 
-    void cleanup() throws ExecuteException;
+    void cleanup(ExecuteResult result) throws ExecuteException;
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
index a235290..074ff17 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
@@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
@@ -123,9 +124,11 @@ public class NSparkCubingStep extends NSparkExecutable {
     }
 
     @Override
-    public void cleanup() throws ExecuteException {
+    public void cleanup(ExecuteResult result) throws ExecuteException {
         // delete job tmp dir
-        PathManager.deleteJobTempPath(getConfig(), 
getParam(MetadataConstants.P_PROJECT_NAME),
-                getParam(MetadataConstants.P_JOB_ID));
+        if (result != null && result.state().ordinal() == 
ExecuteResult.State.SUCCEED.ordinal()) {
+            PathManager.deleteJobTempPath(getConfig(), 
getParam(MetadataConstants.P_PROJECT_NAME),
+                    getParam(MetadataConstants.P_JOB_ID));
+        }
     }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
index b3d6a0c..2ccd810 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
@@ -74,9 +74,11 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep 
extends NSparkExecutable {
     }
 
     @Override
-    public void cleanup() throws ExecuteException {
+    public void cleanup(ExecuteResult result) throws ExecuteException {
         // delete job tmp dir
-        PathManager.deleteJobTempPath(getConfig(), 
getParam(MetadataConstants.P_PROJECT_NAME),
-                getParam(MetadataConstants.P_JOB_ID));
+        if (result != null && result.state().ordinal() == 
ExecuteResult.State.SUCCEED.ordinal()) {
+            PathManager.deleteJobTempPath(getConfig(), 
getParam(MetadataConstants.P_PROJECT_NAME),
+                    getParam(MetadataConstants.P_JOB_ID));
+        }
     }
 }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparkerContext.java
 
b/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparkerContext.java
new file mode 100644
index 0000000..34f977a
--- /dev/null
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparkerContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.init;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.spark.sql.SparderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+
+import java.io.File;
+import java.nio.file.Paths;
+
+/**
+ * Created by zhangzc on 8/26/20.
+ */
+public class InitialSparkerContext implements InitializingBean {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(InitialSparkerContext.class);
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        runInitialSparder();
+    }
+
+    private void runInitialSparder() {
+        logger.info("Spark is starting.....");
+        SparderContext.init();
+        final String kylinHome = 
StringUtils.defaultIfBlank(KylinConfig.getKylinHome(), "./");
+        final File appidFile = Paths.get(kylinHome, "sparkappid").toFile();
+        String appid = null;
+        try {
+            appid = 
SparderContext.getSparkSession().sparkContext().applicationId();
+            FileUtils.writeStringToFile(appidFile, appid);
+            logger.info("Spark application id is {}", appid);
+        } catch (Exception e) {
+            logger.error("Failed to generate spark application id[{}] file",
+                    StringUtils.defaultString(appid), e);
+        }
+    }
+}
diff --git a/server/src/main/resources/applicationContext.xml 
b/server/src/main/resources/applicationContext.xml
index b873622..dcf249b 100644
--- a/server/src/main/resources/applicationContext.xml
+++ b/server/src/main/resources/applicationContext.xml
@@ -38,6 +38,7 @@
     <aop:aspectj-autoproxy/>
 
     <bean class="org.apache.kylin.rest.init.InitialTaskManager" />
+    <bean class="org.apache.kylin.rest.init.InitialSparkerContext" />
 
     <context:component-scan base-package="org.apache.kylin.rest"/>
 

Reply via email to