This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d3ecc5a0847da26178e84cf13d55c0378764452e Author: Guoliang Sun <guoliang....@kyligence.io> AuthorDate: Thu Nov 10 15:13:06 2022 +0800 KYLIN-5398 Flame graph availability fix --- .../libasyncProfiler-linux-arm64.so | Bin 0 -> 298256 bytes .../libasyncProfiler-linux-x64.so | Bin 0 -> 303903 bytes build/async-profiler-lib/libasyncProfiler-mac.so | Bin 0 -> 634272 bytes build/release/compress.sh | 5 ++ .../java/org/apache/kylin/common/KapConfig.java | 13 +++- .../org/apache/kylin/common/KylinConfigBase.java | 11 ++- .../kylin/common/asyncprofiler/AsyncArchUtil.java | 77 +++++++++++++++++++++ .../kylin/common/asyncprofiler/AsyncProfiler.java | 69 +++++++++++++----- .../common/asyncprofiler/AsyncProfilerUtils.java | 4 ++ .../libasyncProfiler-linux-arm64.so | Bin 0 -> 298256 bytes .../libasyncProfiler-linux-x64.so | Bin 0 -> 303903 bytes .../async-profiler-lib/libasyncProfiler-mac.so | Bin 0 -> 634272 bytes .../async-profiler-lib/linux64/libasyncProfiler.so | Bin 314098 -> 0 bytes .../async-profiler-lib/macOS/libasyncProfiler.so | Bin 239460 -> 0 bytes .../AsyncProfilerExecutorPlugin.scala | 8 ++- .../common/asyncprofiler/AsyncProfilerTool.scala | 8 ++- .../apache/kylin/common/KylinConfigBaseTest.java | 1 + .../common/asyncprofiler/AsyncArchUtilTest.java} | 34 ++++----- .../common/asyncprofiler/AsyncProfilerTest.java | 17 +++-- .../asyncprofiler/AsyncProfilerToolTest.java | 24 +++++++ .../asyncprofiler/AsyncProfilerUtilsTest.java | 16 +++++ .../org/apache/kylin/rest/service/JobService.java | 3 +- .../localmeta/lib/libasyncProfiler-linux-arm64.so | Bin 0 -> 298256 bytes .../localmeta/lib/libasyncProfiler-linux-x64.so | Bin 0 -> 303903 bytes .../localmeta/lib/libasyncProfiler-mac.so | Bin 0 -> 634272 bytes .../kylin/engine/spark/job/NSparkExecutable.java | 55 +++++++++------ .../kylin/query/asyncprofiler/AsyncProfiling.scala | 9 ++- .../QueryAsyncProfilerDriverPlugin.scala | 7 +- .../query/asyncprofiler/AsyncPluginWithMeta.scala | 8 +-- .../query/asyncprofiler/AsyncProfilingTest.scala | 45 +++++------- .../QueryAsyncProfilerDriverPluginTest.scala | 27 ++------ .../BuildAsyncProfilerDriverPlugin.scala | 9 ++- 32 files changed, 318 insertions(+), 132 deletions(-) diff --git a/build/async-profiler-lib/libasyncProfiler-linux-arm64.so b/build/async-profiler-lib/libasyncProfiler-linux-arm64.so new file mode 100644 index 0000000000..b959823506 Binary files /dev/null and b/build/async-profiler-lib/libasyncProfiler-linux-arm64.so differ diff --git a/build/async-profiler-lib/libasyncProfiler-linux-x64.so b/build/async-profiler-lib/libasyncProfiler-linux-x64.so new file mode 100644 index 0000000000..6d961cec0d Binary files /dev/null and b/build/async-profiler-lib/libasyncProfiler-linux-x64.so differ diff --git a/build/async-profiler-lib/libasyncProfiler-mac.so b/build/async-profiler-lib/libasyncProfiler-mac.so new file mode 100644 index 0000000000..ad45237d5f Binary files /dev/null and b/build/async-profiler-lib/libasyncProfiler-mac.so differ diff --git a/build/release/compress.sh b/build/release/compress.sh index 61c7f9eb9d..bf9d584b92 100755 --- a/build/release/compress.sh +++ b/build/release/compress.sh @@ -43,6 +43,11 @@ if [[ -d "influxdb" ]]; then cp -rf postgresql ${package_name}/ fi +# copy async profiler native files +cp -rf async-profiler-lib/libasyncProfiler-mac.so "${package_name}"/lib/libasyncProfiler-mac.so +cp -rf async-profiler-lib/libasyncProfiler-linux-x64.so "${package_name}"/lib/libasyncProfiler-linux-x64.so +cp -rf async-profiler-lib/libasyncProfiler-linux-arm64.so "${package_name}"/lib/libasyncProfiler-linux-arm64.so + # Add ssb data preparation files mkdir -p ${package_name}/tool/ssb cp -rf ../src/examples/sample_cube/data ${package_name}/tool/ssb/ diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java index 6cb7cbe9b2..06123ccf6c 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java @@ -22,12 +22,14 @@ import java.io.File; import java.io.IOException; import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.annotation.Clarification; import org.apache.kylin.common.util.EncryptUtil; import org.apache.kylin.common.util.FileUtils; +@Slf4j @Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise") public class KapConfig { @@ -248,7 +250,7 @@ public class KapConfig { } /** - * Smart modeling + * Smart modeling */ public String getSmartModelingConf(String conf) { return config.getOptional("kylin.smart.conf." + conf, null); @@ -515,16 +517,21 @@ public class KapConfig { public String sparderFiles() { try { - File storageFile = new File(getKylinConfig().getLogSparkExecutorPropertiesFile()); + File storageFile = new File(config.getLogSparkExecutorPropertiesFile()); String additionalFiles = storageFile.getCanonicalPath(); - storageFile = new File(getKylinConfig().getLogSparkAppMasterPropertiesFile()); + storageFile = new File(config.getLogSparkAppMasterPropertiesFile()); if (additionalFiles.isEmpty()) { additionalFiles = storageFile.getCanonicalPath(); } else { additionalFiles = additionalFiles + "," + storageFile.getCanonicalPath(); } + if (config.asyncProfilingEnabled()) { + additionalFiles = additionalFiles + "," + config.getAsyncProfilerFiles(); + } + log.info("Sparder additionalFiles: {}", additionalFiles); return config.getOptional("kylin.query.engine.sparder-additional-files", additionalFiles); } catch (IOException e) { + log.error("Add sparderFiles failed, " + e); return ""; } } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 0802fc242e..557901c837 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -19,6 +19,8 @@ package org.apache.kylin.common; import static java.lang.Math.toIntExact; +import static org.apache.kylin.common.asyncprofiler.AsyncProfiler.ASYNC_PROFILER_LIB_LINUX_ARM64; +import static org.apache.kylin.common.asyncprofiler.AsyncProfiler.ASYNC_PROFILER_LIB_LINUX_X64; import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_CONNECTION_URL_KEY; import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_DRIVER_KEY; import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_PASS_KEY; @@ -2636,6 +2638,13 @@ public abstract class KylinConfigBase implements Serializable { return getLogPropertyFile("spark-appmaster-log4j.xml"); } + public String getAsyncProfilerFiles() throws IOException { + String kylinHome = getKylinHomeWithoutWarn(); + File libX64 = new File(kylinHome + "/lib/" + ASYNC_PROFILER_LIB_LINUX_X64); + File libArm64 = new File(kylinHome + "/lib/" + ASYNC_PROFILER_LIB_LINUX_ARM64); + return libX64.getCanonicalPath() + "," + libArm64.getCanonicalPath(); + } + private String getLogPropertyFile(String filename) { String parentFolder; if (isDevEnv()) { @@ -3597,7 +3606,7 @@ public abstract class KylinConfigBase implements Serializable { public boolean buildJobProfilingEnabled() { return !Boolean.parseBoolean(System.getProperty("spark.local", FALSE)) - && Boolean.parseBoolean(getOptional("kylin.engine.async-profiler-enabled", FALSE)); + && Boolean.parseBoolean(getOptional("kylin.engine.async-profiler-enabled", TRUE)); } public long buildJobProfilingResultTimeout() { diff --git a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtil.java b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtil.java new file mode 100644 index 0000000000..a1b77ab622 --- /dev/null +++ b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtil.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.asyncprofiler; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +public class AsyncArchUtil { + + public enum ArchType { + LINUX_X64(), LINUX_ARM64() + } + + private static final Map<String, ArchType> ARCH_TO_PROCESSOR = new HashMap<>(); + + static { + addProcessors(ArchType.LINUX_X64, "x86_64", "amd64"); + addProcessors(ArchType.LINUX_ARM64, "aarch64"); + } + + public static ArchType getProcessor() { + return getProcessor(getSystemProperty("os.arch")); + } + + /** + * Returns a {@link ArchType} object with the given value {@link String}. The {@link String} must be + * like a value returned by the os.arch System Property. + * + * @param osArch A {@link String} like a value returned by the os.arch System Property. + * @return A {@link ArchType} when it exists, else {@code null}. + */ + public static ArchType getProcessor(final String osArch) { + return ARCH_TO_PROCESSOR.get(osArch); + } + + /** + * <p> + * Gets a System property, defaulting to {@code null} if the property cannot be read. + * </p> + * <p> + * If a {@code SecurityException} is caught, the return value is {@code null} and a message is written to + * {@code System.err}. + * </p> + * + * @param property the system property name + * @return the system property value or {@code null} if a security problem occurs + */ + private static String getSystemProperty(final String property) { + try { + return System.getProperty(property); + } catch (final SecurityException ex) { + // we are not allowed to look at this property, the SystemUtils property value will default to null. + return null; + } + } + + private static void addProcessors(ArchType archType, final String... keys) { + Stream.of(keys).forEach(key -> ARCH_TO_PROCESSOR.put(key, archType)); + } +} \ No newline at end of file diff --git a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java index c49cdffe20..a0ba5f13a1 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java @@ -18,27 +18,31 @@ package org.apache.kylin.common.asyncprofiler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.Objects; +import org.apache.kylin.common.KylinConfigBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class AsyncProfiler { private static final Logger logger = LoggerFactory.getLogger(AsyncProfiler.class); - private static final String LIB_FILE = "libasyncProfiler.so"; - private static final String LIB_PARENT = "/async-profiler-lib"; - private static final String MAC_LIB_PATH = LIB_PARENT + "/macOS/" + LIB_FILE; - private static final String LINUX_64_LIB_PATH = LIB_PARENT + "/linux64/" + LIB_FILE; + // async profiler native files + public static final String ASYNC_PROFILER_LIB_MAC = "libasyncProfiler-mac.so"; + public static final String ASYNC_PROFILER_LIB_LINUX_X64 = "libasyncProfiler-linux-x64.so"; + public static final String ASYNC_PROFILER_LIB_LINUX_ARM64 = "libasyncProfiler-linux-arm64.so"; + private static final String LIB_PARENT = "/async-profiler-lib/"; private static AsyncProfiler profiler; private boolean loaded = false; - public static synchronized AsyncProfiler getInstance() { + public static synchronized AsyncProfiler getInstance(boolean loadLocalLib) { if (profiler == null) { - profiler = new AsyncProfiler(); + profiler = new AsyncProfiler(loadLocalLib); } return profiler; } @@ -55,14 +59,43 @@ public class AsyncProfiler { logger.info("Test arg for ut: {}", ignore); } - private AsyncProfiler() { + private AsyncProfiler(boolean loadLocalLib) { try { boolean isTestingOnLocalMac = System.getProperty("os.name", "").contains("Mac") || System.getProperty("os.name", "").contains("OS X"); if (isTestingOnLocalMac) { - loadLibAsyncProfilerSO(MAC_LIB_PATH); + loadLibAsyncProfilerSO(LIB_PARENT + ASYNC_PROFILER_LIB_MAC); } else { - loadLibAsyncProfilerSO(LINUX_64_LIB_PATH); + String libName; + File libPath; + + // Select native lib loading based on machine architecture + AsyncArchUtil.ArchType archType = AsyncArchUtil.getProcessor(); + logger.info("Machine's archType: {}", archType); + switch (archType) { + case LINUX_ARM64: + libName = ASYNC_PROFILER_LIB_LINUX_ARM64; + break; + case LINUX_X64: + default: + libName = ASYNC_PROFILER_LIB_LINUX_X64; + break; + } + + // Adapting load paths based on Spark deployment patterns + if (loadLocalLib) { + libPath = new File(KylinConfigBase.getKylinHome() + "/lib/" + libName); + } else { + libPath = new File(libName); + } + logger.info("AsyncProfiler libPath: {}, exists: {}", libPath.getAbsolutePath(), + Files.exists(libPath.toPath())); + // check this for ut + if (libPath.exists()) { + System.load(libPath.getAbsolutePath()); + } else { + loadLibAsyncProfilerSO(LIB_PARENT + libName); + } } loaded = true; } catch (Exception e) { @@ -71,12 +104,12 @@ public class AsyncProfiler { } private void loadLibAsyncProfilerSO(String libPath) throws IOException { - final java.nio.file.Path tmpLib = java.io.File.createTempFile("libasyncProfiler", ".so").toPath(); - java.nio.file.Files.copy( - Objects.requireNonNull(AsyncProfilerTool.class.getResourceAsStream(libPath)), - tmpLib, - java.nio.file.StandardCopyOption.REPLACE_EXISTING); - System.load(tmpLib.toAbsolutePath().toString()); + File asyncProfilerLib = File.createTempFile("libasyncProfiler", ".so"); + java.nio.file.Files.copy(Objects.requireNonNull(AsyncProfilerTool.class.getResourceAsStream(libPath)), + asyncProfilerLib.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING); + logger.info("AsyncProfiler will try to load from libPath: {}, exists: {}", asyncProfilerLib.getAbsolutePath(), + asyncProfilerLib.exists()); + System.load(asyncProfilerLib.getAbsolutePath()); } public boolean isLoaded() { diff --git a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java index e86648268a..cd74a7cd33 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java @@ -53,6 +53,10 @@ public class AsyncProfilerUtils { this.cachedResult = countDownLatch; } + public void build(File localCacheDir) { + this.localCacheDir = localCacheDir; + } + public void build(long resultCollectionTimeout, File localCacheDir) { this.resultCollectionTimeout = resultCollectionTimeout; this.localCacheDir = localCacheDir; diff --git a/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-arm64.so b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-arm64.so new file mode 100644 index 0000000000..b959823506 Binary files /dev/null and b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-arm64.so differ diff --git a/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-x64.so b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-x64.so new file mode 100644 index 0000000000..6d961cec0d Binary files /dev/null and b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-x64.so differ diff --git a/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-mac.so b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-mac.so new file mode 100644 index 0000000000..ad45237d5f Binary files /dev/null and b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-mac.so differ diff --git a/src/core-common/src/main/resources/async-profiler-lib/linux64/libasyncProfiler.so b/src/core-common/src/main/resources/async-profiler-lib/linux64/libasyncProfiler.so deleted file mode 100755 index 4153f52868..0000000000 Binary files a/src/core-common/src/main/resources/async-profiler-lib/linux64/libasyncProfiler.so and /dev/null differ diff --git a/src/core-common/src/main/resources/async-profiler-lib/macOS/libasyncProfiler.so b/src/core-common/src/main/resources/async-profiler-lib/macOS/libasyncProfiler.so deleted file mode 100755 index c7298c6b82..0000000000 Binary files a/src/core-common/src/main/resources/async-profiler-lib/macOS/libasyncProfiler.so and /dev/null differ diff --git a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala index fe4a3918a2..e423c98f81 100644 --- a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala +++ b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala @@ -20,6 +20,7 @@ package org.apache.kylin.common.asyncprofiler import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.kylin.common.asyncprofiler.Message._ +import org.apache.kylin.common.util.ExecutorServiceUtil import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext} import org.apache.spark.internal.Logging @@ -40,7 +41,8 @@ class AsyncProfilerExecutorPlugin extends ExecutorPlugin with Logging { val profile = new Runnable { override def run(): Unit = checkAndProfile() } - log.debug(s"AsyncProfiler status: ${AsyncProfilerTool.status()}") + AsyncProfilerTool.loadAsyncProfilerLib(false) + log.info(s"AsyncProfiler status: ${AsyncProfilerTool.status()}") scheduledExecutorService.scheduleWithFixedDelay( profile, 0, checkingInterval, TimeUnit.MILLISECONDS) } @@ -80,4 +82,8 @@ class AsyncProfilerExecutorPlugin extends ExecutorPlugin with Logging { ctx.send(msg) } + override def shutdown(): Unit = { + ExecutorServiceUtil.shutdownGracefully(scheduledExecutorService, 3) + super.shutdown() + } } diff --git a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala index 0deb703da9..69bb7c72bb 100644 --- a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala +++ b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala @@ -27,7 +27,13 @@ object AsyncProfilerTool { val log: Logger = LoggerFactory.getLogger(AsyncProfilerTool.getClass) - private val profiler = AsyncProfiler.getInstance() + private var profiler: AsyncProfiler = _ + + def loadAsyncProfilerLib(loadLocalLib: Boolean): Unit = { + // Local - load Sparder Driver or (Spark Driver which in client mode) + // Remote - load all Executors or (Spark Engine which in cluster mode) + profiler = AsyncProfiler.getInstance(loadLocalLib) + } private var _running = false diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java index f2c3cb1a2f..f60a2b003b 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java @@ -1191,6 +1191,7 @@ class KylinConfigBaseTest { @Test void testBuildJobProfilingEnabled() { KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty("kylin.engine.async-profiler-enabled", "false"); assertFalse(config.buildJobProfilingEnabled()); config.setProperty("kylin.engine.async-profiler-enabled", "true"); assertTrue(config.buildJobProfilingEnabled()); diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtilTest.java similarity index 50% copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala copy to src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtilTest.java index 68f344186b..0ba16c4a75 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala +++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtilTest.java @@ -16,29 +16,25 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.common.asyncprofiler; -import org.apache.spark.SparkContext -import org.apache.spark.api.plugin.{DriverPlugin, PluginContext} -import org.apache.spark.internal.Logging +import org.junit.Assert; +import org.junit.Test; -import java.util +public class AsyncArchUtilTest { -class QueryAsyncProfilerDriverPlugin extends DriverPlugin with Logging { - - override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = super.init(sc, pluginContext) + @Test + public void testGetProcessor() { + AsyncArchUtil.ArchType archType = AsyncArchUtil.getProcessor(); + Assert.assertNotNull(archType); + } - override def receive(message: Any): AnyRef = { - import org.apache.kylin.common.asyncprofiler.Message._ + @Test + public void testArchType() { + AsyncArchUtil.ArchType archType = AsyncArchUtil.getProcessor("x86_64"); + Assert.assertEquals(AsyncArchUtil.ArchType.LINUX_X64, archType); - val (command, executorId, param) = processMessage(message.toString) - command match { - case NEXT_COMMAND => - AsyncProfiling.nextCommand() - case RESULT => - AsyncProfiling.cacheExecutorResult(param, executorId) - "" - case _ => "" + archType = AsyncArchUtil.getProcessor("aarch64"); + Assert.assertEquals(AsyncArchUtil.ArchType.LINUX_ARM64, archType); } - } } \ No newline at end of file diff --git a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java index 82ab3f7a35..176d9ad119 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java @@ -26,8 +26,13 @@ import org.junit.Test; public class AsyncProfilerTest { @Test - public void testLoaded() { - Assert.assertTrue(AsyncProfiler.getInstance().isLoaded()); + public void testLocalLoaded() { + Assert.assertTrue(AsyncProfiler.getInstance(true).isLoaded()); + } + + @Test + public void testRemoteLoaded() { + Assert.assertTrue(AsyncProfiler.getInstance(false).isLoaded()); } // This may success in local Mac, but failed in CI @@ -36,7 +41,7 @@ public class AsyncProfilerTest { System.setProperty("os.name", "Mac"); String errorMsg = ""; try { - AsyncProfiler.getInstance(); + AsyncProfiler.getInstance(true); } catch (Throwable throwable) { errorMsg = throwable.getMessage(); } @@ -45,7 +50,7 @@ public class AsyncProfilerTest { @Test public void testExecute() throws IOException { - AsyncProfiler asyncProfiler = AsyncProfiler.getInstance(); + AsyncProfiler asyncProfiler = AsyncProfiler.getInstance(true); try { asyncProfiler.execute("start,event=cpu"); asyncProfiler.stop(); @@ -59,12 +64,12 @@ public class AsyncProfilerTest { @Test public void testStop() { Assert.assertThrows("Profiler is not active", IllegalStateException.class, - AsyncProfiler.getInstance()::stop); + AsyncProfiler.getInstance(true)::stop); } @Test public void testAsyncProfilerUtInstance() { - AsyncProfiler originInstance = AsyncProfiler.getInstance(); + AsyncProfiler originInstance = AsyncProfiler.getInstance(true); AsyncProfiler utInstance = AsyncProfiler.utInstance(); Assert.assertSame(originInstance, utInstance); } diff --git a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java index 85b73a80bc..2cdb832339 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java @@ -40,6 +40,30 @@ public class AsyncProfilerToolTest extends NLocalFileMetadataTestCase { this.cleanupTestMetadata(); } + @Test + public void testLoadLocalAsyncProfilerLib() { + AsyncProfilerTool.loadAsyncProfilerLib(true); + String errorMsg = ""; + try { + AsyncProfilerTool.status(); + } catch (Exception e) { + errorMsg = e.getMessage(); + } + Assert.assertTrue(errorMsg.isEmpty()); + } + + @Test + public void testLoadRemoteAsyncProfilerLib() { + AsyncProfilerTool.loadAsyncProfilerLib(false); + String errorMsg = ""; + try { + AsyncProfilerTool.status(); + } catch (Exception e) { + errorMsg = e.getMessage(); + } + Assert.assertTrue(errorMsg.isEmpty()); + } + @Test public void testStartAndStop() { try { diff --git a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java index 7eb636839a..fba1ad54a6 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java @@ -57,6 +57,22 @@ public class AsyncProfilerUtilsTest { Assert.assertEquals(testFile, asyncProfilerUtilsBuild.localCacheDir); } + @Test + public void testBuildWithNewLocalCacheDir() throws IOException { + AsyncProfilerUtils asyncProfilerUtilsBuild = AsyncProfilerUtils.getInstance(); + asyncProfilerUtilsBuild.build(new CountDownLatch(2)); + Assert.assertEquals(2, asyncProfilerUtilsBuild.cachedResult.getCount()); + + File testFile = Files.createTempDirectory("ke-build-async-test-profiler-").toFile(); + asyncProfilerUtilsBuild.build(2L, testFile); + Assert.assertEquals(2L, asyncProfilerUtilsBuild.resultCollectionTimeout); + Assert.assertEquals(testFile, asyncProfilerUtilsBuild.localCacheDir); + + testFile = Files.createTempDirectory("ke-build-async-test-profiler-").toFile(); + asyncProfilerUtilsBuild.build(testFile); + Assert.assertEquals(testFile, asyncProfilerUtilsBuild.localCacheDir); + } + @Test public void testWaitForResultTimeout() throws IOException, InterruptedException { AsyncProfilerUtils asyncProfilerUtils = AsyncProfilerUtils.getInstance(); diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java index 22424681c5..494ac3b50e 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -47,7 +47,6 @@ import java.util.stream.Stream; import javax.servlet.http.HttpServletRequest; -import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.io.IOUtils; @@ -98,6 +97,7 @@ import org.apache.kylin.job.execution.StageBase; import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.model.FusionModel; import org.apache.kylin.metadata.model.FusionModelManager; import org.apache.kylin.metadata.model.NDataModel; @@ -1340,6 +1340,7 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl } public void setResponseLanguage(HttpServletRequest request) { + aclEvaluate.checkIsGlobalAdmin(); String languageToHandle = request.getHeader(HttpHeaders.ACCEPT_LANGUAGE); if (languageToHandle == null) { ErrorCode.setMsg("cn"); diff --git a/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-arm64.so b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-arm64.so new file mode 100644 index 0000000000..b959823506 Binary files /dev/null and b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-arm64.so differ diff --git a/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-x64.so b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-x64.so new file mode 100644 index 0000000000..6d961cec0d Binary files /dev/null and b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-x64.so differ diff --git a/src/examples/test_case_data/localmeta/lib/libasyncProfiler-mac.so b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-mac.so new file mode 100644 index 0000000000..ad45237d5f Binary files /dev/null and b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-mac.so differ diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index a842c926ca..d31d02f83e 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -18,13 +18,23 @@ package org.apache.kylin.engine.spark.job; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin; -import lombok.val; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; @@ -60,26 +70,18 @@ import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin; import org.apache.parquet.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import lombok.val; /** * @@ -883,6 +885,13 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage filePaths.add(kylinConf.getLogSparkAppMasterPropertiesFile()); filePaths.add(kylinConf.getLogSparkDriverPropertiesFile()); filePaths.add(kylinConf.getLogSparkExecutorPropertiesFile()); + if (kylinConf.buildJobProfilingEnabled()) { + try { + filePaths.add(kylinConf.getAsyncProfilerFiles()); + } catch (IOException e) { + logger.error("Add SparkPluginFile failed.", e); + } + } filePaths.add(sparkConf.get(SPARK_FILES_1)); filePaths.add(sparkConf.get(SPARK_FILES_2)); diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala index e1dc51869c..d27cfc76f5 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala @@ -25,14 +25,14 @@ import org.apache.kylin.common.exception.{KylinException, QueryErrorCode} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparderEnv -import java.io.OutputStream +import java.io.{File, OutputStream} import java.nio.file.Files import java.util.concurrent.CountDownLatch object AsyncProfiling extends Logging { - private val localCacheDir = Files.createTempDirectory("ke-async-profiler-result-").toFile + var localCacheDir: File = Files.createTempDirectory("ke-async-profiler-result-").toFile localCacheDir.deleteOnExit() private val resultCollectionTimeout = KylinConfig.getInstanceFromEnv.asyncProfilingResultTimeout private val profilingTimeout = KylinConfig.getInstanceFromEnv.asyncProfilingProfileTimeout @@ -57,6 +57,11 @@ object AsyncProfiling extends Logging { throw new KylinException(QueryErrorCode.PROFILING_ALREADY_STARTED, "profiling is already started, stop it first") } logDebug("profiler start") + // Linux may periodically clean up the files in the /tmp directory + if (!localCacheDir.exists()) { + localCacheDir = Files.createTempDirectory("ke-async-profiler-result-").toFile + asyncProfilerUtils.build(localCacheDir) + } asyncProfilerUtils.cleanLocalCache() // expecting driver + count(executor) amount of results cachedResult = new CountDownLatch( diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala index 68f344186b..8af9631561 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala @@ -18,6 +18,7 @@ package org.apache.kylin.query.asyncprofiler +import org.apache.kylin.common.asyncprofiler.AsyncProfilerTool import org.apache.spark.SparkContext import org.apache.spark.api.plugin.{DriverPlugin, PluginContext} import org.apache.spark.internal.Logging @@ -26,7 +27,11 @@ import java.util class QueryAsyncProfilerDriverPlugin extends DriverPlugin with Logging { - override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = super.init(sc, pluginContext) + override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = { + // Sparder Driver and KE are always in one JVM, in client mode + AsyncProfilerTool.loadAsyncProfilerLib(true) + super.init(sc, pluginContext) + } override def receive(message: Any): AnyRef = { import org.apache.kylin.common.asyncprofiler.Message._ diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala index d94a560dd8..9db5d72000 100644 --- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala @@ -19,7 +19,6 @@ package org.apache.kylin.query.asyncprofiler import org.apache.kylin.common.util.NLocalFileMetadataTestCase -import org.apache.spark.sql.{SparderEnv, SparkSession} import org.apache.spark.{SparkContext, SparkFunSuite} import org.scalatest.BeforeAndAfterAll @@ -44,11 +43,10 @@ trait AsyncPluginWithMeta extends SparkFunSuite with BeforeAndAfterAll { } protected def clearSparkSession(): Unit = { - if (SparderEnv.isSparkAvailable) { - SparderEnv.getSparkSession.stop() + if (sc != null) { + sc.stop() + sc = null } - SparkSession.setActiveSession(null) - SparkSession.setDefaultSession(null) } override def beforeAll(): Unit = { diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala index c05923664d..21c08c4ee7 100644 --- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala @@ -24,7 +24,7 @@ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.{SparkConf, SparkContext} import org.mockito.Mockito.mock -import java.io.OutputStream +import java.io.{File, OutputStream} class AsyncProfilingTest extends AsyncPluginWithMeta { @@ -34,55 +34,46 @@ class AsyncProfilingTest extends AsyncPluginWithMeta { val statusFileName: String = flagFileDir + "/status" val dumpFileName: String = flagFileDir + "/dump.tar.gz" - test("init AsyncProfiling") { - AsyncProfiling.asyncProfilerUtils - } - - test("start and dump AsyncProfiling") { + override def beforeAll(): Unit = { + super.beforeAll() val conf = new SparkConf() .setAppName(getClass.getName) .set(SparkLauncher.SPARK_MASTER, "local[1]") .set("spark.plugins", sparkPluginName) sc = new SparkContext(conf) + } + + test("init AsyncProfiling") { + AsyncProfiling.asyncProfilerUtils + } + + test("start and dump AsyncProfiling") { AsyncProfiling.start("") AsyncProfiling.dump("") + } - sc.stop() - sc = null + test("start with localCacheDir by delete") { + AsyncProfiling.nextCommand() + val localCacheDir = AsyncProfiling.localCacheDir + new File(localCacheDir.getAbsolutePath).delete() + AsyncProfiling.start("") + AsyncProfiling.dump("") } test("waitForResult AsyncProfiling") { KylinConfig.getInstanceFromEnv.setProperty("kylin.query.async-profiler-result-timeout", "1ms") - val conf = new SparkConf() - .setAppName(getClass.getName) - .set(SparkLauncher.SPARK_MASTER, "local[1]") - .set("spark.plugins", sparkPluginName) - - - sc = new SparkContext(conf) AsyncProfiling.start("") AsyncProfiling.dump("") AsyncProfiling.waitForResult(mock(classOf[OutputStream])) - - sc.stop() - sc = null } test("cacheExecutorResult AsyncProfiling") { KylinConfig.getInstanceFromEnv.setProperty("kylin.query.async-profiler-result-timeout", "1ms") - val conf = new SparkConf() - .setAppName(getClass.getName) - .set(SparkLauncher.SPARK_MASTER, "local[1]") - .set("spark.plugins", sparkPluginName) - - sc = new SparkContext(conf) AsyncProfiling.start("") AsyncProfiling.cacheExecutorResult("content", "1") - - sc.stop() - sc = null + AsyncProfiling.dump("") } } diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala index 51843bfbbf..8d82c3c684 100644 --- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala @@ -26,53 +26,36 @@ class QueryAsyncProfilerDriverPluginTest extends AsyncPluginWithMeta { val sparkPluginName: String = classOf[QueryAsyncProfilerSparkPlugin].getName - test("plugin initialization") { + override def beforeAll(): Unit = { + super.beforeAll() val conf = new SparkConf() .setAppName(getClass.getName) .set(SparkLauncher.SPARK_MASTER, "local[1]") .set("spark.plugins", sparkPluginName) sc = new SparkContext(conf) + } + + test("plugin initialization") { Assert.assertEquals(sparkPluginName, sc.getConf.get("spark.plugins")) new QueryAsyncProfilerDriverPlugin().receive("NEX-1:start,event=cpu") - - sc.stop() - sc = null } test("plugin initialization receive result") { - val conf = new SparkConf() - .setAppName(getClass.getName) - .set(SparkLauncher.SPARK_MASTER, "local[1]") - .set("spark.plugins", sparkPluginName) - - sc = new SparkContext(conf) Assert.assertEquals(sparkPluginName, sc.getConf.get("spark.plugins")) try { new QueryAsyncProfilerDriverPlugin().receive("RES-1:flamegraph") } catch { case _: Throwable => } - - sc.stop() - sc = null } test("plugin initialization receive others") { - val conf = new SparkConf() - .setAppName(getClass.getName) - .set(SparkLauncher.SPARK_MASTER, "local[1]") - .set("spark.plugins", sparkPluginName) - - sc = new SparkContext(conf) Assert.assertEquals(sparkPluginName, sc.getConf.get("spark.plugins")) try { new QueryAsyncProfilerDriverPlugin().receive("OTH-1:start,event=cpu") } catch { case _: Throwable => } - - sc.stop() - sc = null } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala index bbe900f49d..36497869e2 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala @@ -22,7 +22,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path} import org.apache.kylin.common.asyncprofiler.Message._ import org.apache.kylin.common.asyncprofiler.{AsyncProfilerTool, AsyncProfilerUtils} -import org.apache.kylin.common.util.HadoopUtil +import org.apache.kylin.common.util.{ExecutorServiceUtil, HadoopUtil} import org.apache.spark.SparkContext import org.apache.spark.api.plugin.{DriverPlugin, PluginContext} import org.apache.spark.internal.Logging @@ -71,7 +71,10 @@ class BuildAsyncProfilerDriverPlugin extends DriverPlugin with Logging { val profile = new Runnable { override def run(): Unit = checkAction() } - log.debug(s"AsyncProfiler status: ${AsyncProfilerTool.status()}") + val deployMode = sc.getConf.get("spark.submit.deployMode", "") + log.info("Current spark.submit.deployMode: {}", deployMode) + AsyncProfilerTool.loadAsyncProfilerLib(deployMode.equals("client")) + log.info(s"AsyncProfiler status: ${AsyncProfilerTool.status()}") scheduledExecutorService.scheduleWithFixedDelay( profile, 0, checkingInterval, TimeUnit.MILLISECONDS) @@ -129,6 +132,8 @@ class BuildAsyncProfilerDriverPlugin extends DriverPlugin with Logging { override def shutdown(): Unit = { val fs: FileSystem = HadoopUtil.getFileSystem(statusFileName) HadoopUtil.writeStringToHdfs(fs, ProfilerStatus.CLOSED, statusFileName) + ExecutorServiceUtil.shutdownGracefully(scheduledExecutorService, 3) + super.shutdown() } def start(params: String): Unit = {