This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch 3.0.x in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 753e1a85cb7d5cde660e66ae11449bca0b786883 Author: Xiaoxiang Yu <[email protected]> AuthorDate: Tue Dec 17 18:15:19 2019 +0800 KYLIN-4197 Set deadline for ClientEnvExtractor (#994) * KYLIN-4197 Set deadline for ClientEnvExtractor * Add configuration for lambda mode. --- build/bin/kylin-port-replace-util.sh | 2 +- .../org/apache/kylin/common/KylinConfigBase.java | 4 ++ .../rest/controller/StreamingV2Controller.java | 11 +++- .../org/apache/kylin/tool/ClientEnvExtractor.java | 52 ++++++++++++----- .../apache/kylin/tool/ClientEnvExtractorTest.java | 68 ++++++++++++++++++++++ 5 files changed, 119 insertions(+), 18 deletions(-) diff --git a/build/bin/kylin-port-replace-util.sh b/build/bin/kylin-port-replace-util.sh index 0d6f005..28a53db 100755 --- a/build/bin/kylin-port-replace-util.sh +++ b/build/bin/kylin-port-replace-util.sh @@ -89,7 +89,7 @@ then #replace kylin.stream.node for Streaming Coordinator stream_node="kylin.stream.node=`hostname -f`:$new_kylin_port" echo "Using new kylin.stream.node: $stream_node" - line_count=$(awk '$0 ~ /kylin.stream.node/ {print $0}' ${KYLIN_CONFIG_FILE} | wc -l) + line_count=$(awk '$0 ~ /^kylin.stream.node/ {print $0}' ${KYLIN_CONFIG_FILE} | wc -l) if [[ $line_count -eq 0 ]]; then echo "kylin.stream.node=`hostname -f`:7070" >> ${KYLIN_CONFIG_FILE} fi diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index d44d944..4c8d437 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2289,6 +2289,10 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true")); } + public String getHiveDatabaseLambdaCube() { + return this.getOptional("kylin.stream.hive.database-for-lambda-cube", DEFAULT); + } + // ============================================================================ // Health Check CLI // ============================================================================ diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java index cfd7086..846616e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java @@ -188,9 +188,11 @@ public class StreamingV2Controller extends BasicController { // validate the compatibility for input table schema and the underline hive table schema if (tableDesc.getSourceType() == ISourceAware.ID_KAFKA_HIVE) { List<FieldSchema> fields; + String db = tableDesc.getDatabase(); try { HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(new HiveConf()); - fields = metaStoreClient.getFields(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable(), tableDesc.getName()); + fields = metaStoreClient.getFields(db, tableDesc.getName()); + logger.info("Checking the {} in {}", tableDesc.getName(), db); } catch (NoSuchObjectException noObjectException) { logger.info("table not exist in hive meta store for table:" + tableDesc.getIdentity(), noObjectException); @@ -209,10 +211,12 @@ public class StreamingV2Controller extends BasicController { for (ColumnDesc columnDesc : tableDesc.getColumns()) { FieldSchema fieldSchema = fieldSchemaMap.get(columnDesc.getName().toUpperCase(Locale.ROOT)); if (fieldSchema == null) { + // Partition column cannot be fetched via Hive Metadata API. if (!TimeDerivedColumnType.isTimeDerivedColumn(columnDesc.getName())) { - incompatibleMsgs.add("column not exist in hive table:" + columnDesc.getName()); + incompatibleMsgs.add("Column not exist in hive table:" + columnDesc.getName()); continue; } else { + logger.info("Column not exist in hive table: {}.", columnDesc.getName()); continue; } } @@ -486,6 +490,7 @@ public class StreamingV2Controller extends BasicController { private TableDesc deserializeTableDesc(StreamingRequestV2 streamingRequest) { TableDesc desc = null; + String db = KylinConfig.getInstanceFromEnv().getHiveDatabaseLambdaCube(); try { logger.debug("Saving TableDesc " + streamingRequest.getTableData()); desc = JsonUtil.readValue(streamingRequest.getTableData(), TableDesc.class); @@ -502,7 +507,7 @@ public class StreamingV2Controller extends BasicController { String[] dbTable = HadoopUtil.parseHiveTableName(desc.getName()); desc.setName(dbTable[1]); - desc.setDatabase(dbTable[0]); + desc.setDatabase(db); desc.getIdentity(); return desc; } diff --git a/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java b/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java index 78d02c5..80741ea 100644 --- a/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java @@ -22,6 +22,12 @@ package org.apache.kylin.tool; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; @@ -38,10 +44,12 @@ public class ClientEnvExtractor extends AbstractInfoExtractor { private static final Logger logger = LoggerFactory.getLogger(ClientEnvExtractor.class); private KylinConfig kylinConfig; private CliCommandExecutor cmdExecutor; + private ExecutorService executorService; + int maxWaitSeconds = 120; public ClientEnvExtractor() throws IOException { super(); - + executorService = Executors.newFixedThreadPool(1); packageType = "client"; kylinConfig = KylinConfig.getInstanceFromEnv(); cmdExecutor = kylinConfig.getCliCommandExecutor(); @@ -64,6 +72,7 @@ public class ClientEnvExtractor extends AbstractInfoExtractor { addShellOutput("hbase version", "hbase", "version"); addShellOutput("hive --version", "hive", "version"); addShellOutput("beeline --version", "hive", "beeline_version"); + executorService.shutdownNow(); } private void addLocalFile(String src, String destDir) { @@ -83,20 +92,35 @@ public class ClientEnvExtractor extends AbstractInfoExtractor { } } - private void addShellOutput(String cmd, String destDir, String filename) { - try { - File destDirFile = null; - if (!StringUtils.isEmpty(destDir)) { - destDirFile = new File(exportDir, destDir); - FileUtils.forceMkdir(destDirFile); - } else { - destDirFile = exportDir; + void addShellOutput(String cmd, String destDir, String filename) { + Future f = executorService.submit(() -> { + try { + File destDirFile = null; + if (!StringUtils.isEmpty(destDir)) { + destDirFile = new File(exportDir, destDir); + FileUtils.forceMkdir(destDirFile); + } else { + destDirFile = exportDir; + } + Pair<Integer, String> result = cmdExecutor.execute(cmd); + String output = result.getSecond(); + FileUtils.writeStringToFile(new File(destDirFile, filename), output, Charset.defaultCharset()); + } catch (IOException e) { + logger.warn("Failed to run command: " + cmd + ".", e); } - Pair<Integer, String> result = cmdExecutor.execute(cmd); - String output = result.getSecond(); - FileUtils.writeStringToFile(new File(destDirFile, filename), output, Charset.defaultCharset()); - } catch (Exception e) { - logger.warn("Failed to run command: " + cmd + ".", e); + }); + + try { + // assume most shell should return in two minutes + f.get(maxWaitSeconds, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + logger.error("Timeout for \"{}\" in {} seconds.", cmd, maxWaitSeconds); + executorService.shutdownNow(); + executorService = Executors.newFixedThreadPool(1); + } catch (ExecutionException runtimeException) { + logger.error("Runtime error: {}", runtimeException.getLocalizedMessage()); + } catch (InterruptedException otherException) { + // Ignore } } } diff --git a/tool/src/test/java/org/apache/kylin/tool/ClientEnvExtractorTest.java b/tool/src/test/java/org/apache/kylin/tool/ClientEnvExtractorTest.java new file mode 100644 index 0000000..d2b31b0 --- /dev/null +++ b/tool/src/test/java/org/apache/kylin/tool/ClientEnvExtractorTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.kylin.tool; + +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +public class ClientEnvExtractorTest extends HBaseMetadataTestCase { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void setup() throws Exception { + super.createTestMetadata(); + } + + @After + public void after() throws Exception { + super.cleanupTestMetadata(); + } + + @Test + public void testNormal() throws IOException { + File f = folder.newFolder("ClientEnvExtractorTest_testNormal"); + ClientEnvExtractor executor = new ClientEnvExtractor(); + executor.addShellOutput("pwd", f.getAbsolutePath(), "testNormal"); + } + + @Test(timeout = 5000) + public void testTimeout() throws IOException { + File f = folder.newFolder("ClientEnvExtractorTest_testTimeout"); + ClientEnvExtractor executor = new ClientEnvExtractor(); + executor.maxWaitSeconds = 2; + executor.addShellOutput("sleep 1000", f.getAbsolutePath(), "testTimeout"); + executor.addShellOutput("pwd", f.getAbsolutePath(), "pwd"); + } + + @Test + public void testError() throws IOException { + File f = folder.newFolder("ClientEnvExtractorTest_testError"); + ClientEnvExtractor executor = new ClientEnvExtractor(); + executor.addShellOutput("CMD_NEVER_EXISTS", f.getAbsolutePath(), "testError"); + } +} \ No newline at end of file
