This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch engine-flink in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push: new 5eac35a KYLIN-3897 Implement IFlinkInput based on Hive 5eac35a is described below commit 5eac35a3bcc534125ef12c936a5e12f56a889921 Author: yanghua <yanghua1...@gmail.com> AuthorDate: Wed Mar 20 14:10:49 2019 +0800 KYLIN-3897 Implement IFlinkInput based on Hive --- source-hive/pom.xml | 4 + .../apache/kylin/source/hive/HiveFlinkInput.java | 92 ++++++++++++++++++++++ .../org/apache/kylin/source/hive/HiveSource.java | 3 + 3 files changed, 99 insertions(+) diff --git a/source-hive/pom.xml b/source-hive/pom.xml index e242f0a..6cc15fa 100644 --- a/source-hive/pom.xml +++ b/source-hive/pom.xml @@ -105,6 +105,10 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-engine-spark</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-engine-flink</artifactId> + </dependency> </dependencies> </project> diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveFlinkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveFlinkInput.java new file mode 100644 index 0000000..f3dbf73 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveFlinkInput.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.hive; + +import com.google.common.collect.Lists; +import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.engine.flink.IFlinkInput; +import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.ISegment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; + +/** + * The implementation of {@link IFlinkInput} based on Hive. + */ +public class HiveFlinkInput extends HiveInputBase implements IFlinkInput { + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(HiveFlinkInput.class); + + @Override + public IFlinkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { + return new BatchCubingInputSide(flatDesc); + } + + @Override + public IFlinkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) { + return new IFlinkBatchMergeInputSide() { + @Override + public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) { + // doing nothing + } + }; + } + + public class BatchCubingInputSide extends BaseBatchCubingInputSide implements IFlinkBatchCubingInputSide { + + List<String> hiveViewIntermediateTables = Lists.newArrayList(); + + public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { + super(flatDesc); + } + + protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) { + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); + final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); + + AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, + flatDesc, hiveViewIntermediateTables, jobFlow.getId()); + if (task != null) { + jobFlow.addTask(task); + } + } + + @Override + public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { + final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); + + GarbageCollectionStep step = new GarbageCollectionStep(); + step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP); + step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity())); + step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir))); + step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ",")); + jobFlow.addTask(step); + } + + } + +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index b536bf0..353cf1d 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -21,6 +21,7 @@ package org.apache.kylin.source.hive; import java.io.IOException; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.flink.IFlinkInput; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.spark.ISparkInput; import org.apache.kylin.metadata.model.IBuildable; @@ -48,6 +49,8 @@ public class HiveSource implements ISource { return (I) new HiveMRInput(); } else if (engineInterface == ISparkInput.class) { return (I) new HiveSparkInput(); + } else if (engineInterface == IFlinkInput.class) { + return (I) new HiveFlinkInput(); } else { throw new RuntimeException("Cannot adapt to " + engineInterface); }