This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch engine-flink in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push: new ea34ffc KYLIN-3896 Implement IFlinkOutput based on HBase ea34ffc is described below commit ea34ffceaef91beba81a95087ab998e6a2f736c4 Author: yanghua <yanghua1...@gmail.com> AuthorDate: Wed Mar 20 13:47:52 2019 +0800 KYLIN-3896 Implement IFlinkOutput based on HBase --- storage-hbase/pom.xml | 5 ++ .../apache/kylin/storage/hbase/HBaseStorage.java | 4 + .../hbase/steps/HBaseFlinkOutputTransition.java | 97 ++++++++++++++++++++++ 3 files changed, 106 insertions(+) diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml index 0403700..6f38ed6 100644 --- a/storage-hbase/pom.xml +++ b/storage-hbase/pom.xml @@ -52,6 +52,11 @@ <artifactId>kylin-engine-spark</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-engine-flink</artifactId> + </dependency> + <!-- Env & Test --> <dependency> <groupId>org.apache.kylin</groupId> diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java index ded6598..2a30444 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java @@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.engine.flink.IFlinkOutput; import org.apache.kylin.engine.mr.IMROutput2; import org.apache.kylin.engine.spark.ISparkOutput; import org.apache.kylin.metadata.model.DataModelDesc; @@ -31,6 +32,7 @@ import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.storage.IStorage; import org.apache.kylin.storage.IStorageQuery; +import org.apache.kylin.storage.hbase.steps.HBaseFlinkOutputTransition; import org.apache.kylin.storage.hbase.steps.HBaseMROutput2Transition; import org.apache.kylin.storage.hbase.steps.HBaseSparkOutputTransition; @@ -89,6 +91,8 @@ public class HBaseStorage implements IStorage { return (I) new HBaseMROutput2Transition(); } else if (engineInterface == ISparkOutput.class) { return (I) new HBaseSparkOutputTransition(); + } else if (engineInterface == IFlinkOutput.class) { + return (I) new HBaseFlinkOutputTransition(); } else { throw new RuntimeException("Cannot adapt to " + engineInterface); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkOutputTransition.java new file mode 100644 index 0000000..a2a05c5 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkOutputTransition.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.steps; + +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.flink.IFlinkOutput; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * This "Transition" impl generates cuboid files and then convert to HFile. + * The additional step slows down build process, but the gains is merge + * can read from HDFS instead of over HBase region server. See KYLIN-1007. + * + * This is transitional because finally we want to merge from HTable snapshot. + * However multiple snapshots as MR input is only supported by HBase 1.x. + * Before most users upgrade to latest HBase, they can only use this transitional + * cuboid file solution. + */ +public class HBaseFlinkOutputTransition implements IFlinkOutput { + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(HBaseFlinkOutputTransition.class); + + @Override + public IFlinkBatchCubingOutputSide getBatchCubingOutputSide(final CubeSegment seg) { + final HBaseMRSteps steps = new HBaseMRSteps(seg); + + return new IFlinkBatchCubingOutputSide() { + + @Override + public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) { + jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId())); + } + + @Override + public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) { + jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); + } + + @Override + public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { + // nothing to do + } + + }; + } + + @Override + public IFlinkBatchMergeOutputSide getBatchMergeOutputSide(final CubeSegment seg) { + return new IFlinkBatchMergeOutputSide() { + final HBaseMRSteps steps = new HBaseMRSteps(seg); + + @Override + public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) { + jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId())); + } + + @Override + public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments, + DefaultChainedExecutable jobFlow) { + jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); + } + + @Override + public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) { + steps.addMergingGarbageCollectionSteps(jobFlow); + } + + }; + } + + public IFlinkBatchOptimizeOutputSide getBatchOptimizeOutputSide(final CubeSegment seg) { + return null; + } +} \ No newline at end of file