Revert "refactor BuildCubeWithStream"

This reverts commit a08dd2e03900b321617647d1dbf1c4ee8b4b18c2.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8e9c4550
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8e9c4550
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8e9c4550

Branch: refs/heads/master
Commit: 8e9c4550bb562b497442b17eec6485ae96e848d8
Parents: a6cd409
Author: Hongbin Ma <mahong...@apache.org>
Authored: Mon Sep 19 23:49:18 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Tue Sep 20 11:43:08 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   |   7 +-
 .../kylin/provision/BuildCubeWithStream.java    |  10 +-
 .../kylin/provision/BuildCubeWithStream2.java   | 145 ++++++++++++++++++-
 3 files changed, 150 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8e9c4550/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java 
b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9e9df05..9b282e3 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -187,7 +187,6 @@ public class DeployUtil {
         File tmpFile = File.createTempFile(factTableName, "csv");
         FileOutputStream out = new FileOutputStream(tmpFile);
 
-        InputStream tempIn = null;
         try {
             if (store.exists(factTablePath)) {
                 InputStream oldContent = 
store.getResource(factTablePath).inputStream;
@@ -195,15 +194,13 @@ public class DeployUtil {
             }
             IOUtils.copy(in, out);
             IOUtils.closeQuietly(in);
-            IOUtils.closeQuietly(out);
 
             store.deleteResource(factTablePath);
-            tempIn = new FileInputStream(tmpFile);
-            store.putResource(factTablePath, tempIn, 
System.currentTimeMillis());
+            in = new FileInputStream(tmpFile);
+            store.putResource(factTablePath, in, System.currentTimeMillis());
         } finally {
             IOUtils.closeQuietly(out);
             IOUtils.closeQuietly(in);
-            IOUtils.closeQuietly(tempIn);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/8e9c4550/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index bfe1d0a..6e5313f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -62,10 +62,10 @@ public class BuildCubeWithStream {
 
     private static final Logger logger = 
LoggerFactory.getLogger(org.apache.kylin.provision.BuildCubeWithStream.class);
 
-    protected CubeManager cubeManager;
+    private CubeManager cubeManager;
     private DefaultScheduler scheduler;
     protected ExecutableManager jobService;
-    static final String cubeName = "test_streaming_table_cube";
+    private static final String cubeName = "test_streaming_table_cube";
 
     private KafkaConfig kafkaConfig;
     private MockKafka kafkaServer;
@@ -114,13 +114,13 @@ public class BuildCubeWithStream {
         Assert.assertEquals(topicName, topicMetadata.topic());
     }
 
-    protected void generateStreamData(long startTime, long endTime, int 
numberOfRecords) throws IOException {
+    private void generateStreamData(long startTime, long endTime, int 
numberOfRecords) throws IOException {
         Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
         DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, 
numberOfRecords, cubeName, dataLoader);
         logger.info("Test data inserted into Kafka");
     }
 
-    protected void clearSegment(String cubeName) throws Exception {
+    private void clearSegment(String cubeName) throws Exception {
         CubeInstance cube = cubeManager.getCube(cubeName);
         // remove all existing segments
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -187,7 +187,7 @@ public class BuildCubeWithStream {
         return job.getId();
     }
 
-    protected ExecutableState buildSegment(String cubeName, long startOffset, 
long endOffset) throws Exception {
+    private ExecutableState buildSegment(String cubeName, long startOffset, 
long endOffset) throws Exception {
         CubeSegment segment = 
cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset);
         DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);

http://git-wip-us.apache.org/repos/asf/kylin/blob/8e9c4550/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
index 7959701..2812446 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
@@ -18,11 +18,13 @@
 
 package org.apache.kylin.provision;
 
+import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.Random;
 import java.util.TimeZone;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -30,9 +32,32 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.job.streaming.Kafka10DataLoader;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.BrokerConfig;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,12 +67,79 @@ import static java.lang.Thread.sleep;
 /**
  *  for streaming cubing case "test_streaming_table", using multiple threads 
to build it concurrently.
  */
-public class BuildCubeWithStream2 extends BuildCubeWithStream {
+public class BuildCubeWithStream2 {
 
     private static final Logger logger = 
LoggerFactory.getLogger(BuildCubeWithStream2.class);
+
+    private CubeManager cubeManager;
+    private DefaultScheduler scheduler;
+    protected ExecutableManager jobService;
+    private static final String cubeName = "test_streaming_table_cube";
+
+    private KafkaConfig kafkaConfig;
+    private MockKafka kafkaServer;
     private static boolean generateData = true;
 
-    @Override
+    public void before() throws Exception {
+        deployEnv();
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig);
+        scheduler = DefaultScheduler.createInstance();
+        scheduler.init(new JobEngineConfig(kylinConfig), new 
ZookeeperJobLock());
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        cubeManager = CubeManager.getInstance(kylinConfig);
+
+        final CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        final String factTable = cubeInstance.getFactTable();
+
+        final StreamingManager streamingManager = 
StreamingManager.getInstance(kylinConfig);
+        final StreamingConfig streamingConfig = 
streamingManager.getStreamingConfig(factTable);
+        kafkaConfig = 
KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName());
+
+        String topicName = UUID.randomUUID().toString();
+        String localIp = NetworkUtils.getLocalIp();
+        BrokerConfig brokerConfig = 
kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0);
+        brokerConfig.setHost(localIp);
+        kafkaConfig.setTopic(topicName);
+        
KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig);
+
+        startEmbeddedKafka(topicName, brokerConfig);
+    }
+
+    private void startEmbeddedKafka(String topicName, BrokerConfig 
brokerConfig) {
+        //Start mock Kakfa
+        String zkConnectionStr = "sandbox:2181";
+        ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
+        // Assert.assertEquals(ZooKeeper.States.CONNECTED, 
zkConnection.getZookeeperState());
+        kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), 
brokerConfig.getId());
+        kafkaServer.start();
+
+        kafkaServer.createTopic(topicName, 3, 1);
+        kafkaServer.waitTopicUntilReady(topicName);
+
+        MetadataResponse.TopicMetadata topicMetadata = 
kafkaServer.fetchTopicMeta(topicName);
+        Assert.assertEquals(topicName, topicMetadata.topic());
+    }
+
+    private void generateStreamData(long startTime, long endTime, int 
numberOfRecords) throws IOException {
+        if (numberOfRecords <= 0)
+            return;
+        Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, 
numberOfRecords, cubeName, dataLoader);
+        logger.info("Test data inserted into Kafka");
+    }
+
+    private void clearSegment(String cubeName) throws Exception {
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        // remove all existing segments
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new 
CubeSegment[cube.getSegments().size()]));
+        cubeManager.updateCube(cubeBuilder);
+    }
+
     public void build() throws Exception {
         clearSegment(cubeName);
         SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
@@ -112,6 +204,55 @@ public class BuildCubeWithStream2 extends 
BuildCubeWithStream {
 
     }
 
+
+    private ExecutableState buildSegment(String cubeName, long startOffset, 
long endOffset) throws Exception {
+        CubeSegment segment = 
cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset);
+        DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
+        jobService.addJob(job);
+        waitForJob(job.getId());
+        return job.getStatus();
+    }
+
+    protected void deployEnv() throws IOException {
+        DeployUtil.overrideJobJarLocations();
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+    }
+
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, 
HBaseMetadataTestCase.SANDBOX_TEST_DATA);
+        if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
+            throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+        }
+        
HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
+    }
+
+    public static void afterClass() throws Exception {
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    public void after() {
+        kafkaServer.stop();
+        DefaultScheduler.destroyInstance();
+    }
+
+    protected void waitForJob(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() 
== ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) {
+                break;
+            } else {
+                try {
+                    sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
     public static void main(String[] args) throws Exception {
         try {
             beforeClass();

Reply via email to