Revert "KYLIN-1762 discard job when no stream message"

This reverts commit 1108d9eeccecbccffea0b3f9049151672196c91a.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/da5ba276
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/da5ba276
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/da5ba276

Branch: refs/heads/master
Commit: da5ba276b972f8b3c0d220252e74ac2ff73298fc
Parents: bec25b4
Author: Hongbin Ma <mahong...@apache.org>
Authored: Mon Sep 19 23:50:20 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Tue Sep 20 11:43:08 2016 +0800

----------------------------------------------------------------------
 .../job/execution/DefaultChainedExecutable.java |  6 ---
 .../kylin/source/kafka/SeekOffsetStep.java      | 45 +++++---------------
 2 files changed, 10 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/da5ba276/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 39a5f4f..753b389 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -88,7 +88,6 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
             boolean allSucceed = true;
             boolean hasError = false;
             boolean hasRunning = false;
-            boolean hasDiscarded = false;
             for (Executable task : jobs) {
                 final ExecutableState status = task.getStatus();
                 if (status == ExecutableState.ERROR) {
@@ -100,9 +99,6 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
                 if (status == ExecutableState.RUNNING) {
                     hasRunning = true;
                 }
-                if (status == ExecutableState.DISCARDED) {
-                    hasDiscarded = true;
-                }
             }
             if (allSucceed) {
                 setEndTime(System.currentTimeMillis());
@@ -114,8 +110,6 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
                 notifyUserStatusChange(executableContext, 
ExecutableState.ERROR);
             } else if (hasRunning) {
                 jobService.updateJobOutput(getId(), ExecutableState.RUNNING, 
null, null);
-            } else if (hasDiscarded) {
-                jobService.updateJobOutput(getId(), ExecutableState.DISCARDED, 
null, null);
             } else {
                 jobService.updateJobOutput(getId(), ExecutableState.READY, 
null, null);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/da5ba276/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
index 479f1b8..5dca93f 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -17,10 +17,6 @@
 */
 package org.apache.kylin.source.kafka;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Maps;
-import org.apache.commons.math3.util.MathUtils;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -38,7 +34,6 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -106,39 +101,19 @@ public class SeekOffsetStep extends AbstractExecutable {
             }
         }
 
-        long totalStartOffset = 0, totalEndOffset = 0;
-        for (Long v : startOffsets.values()) {
-            totalStartOffset += v;
-        }
-        for (Long v : endOffsets.values()) {
-            totalEndOffset += v;
-        }
+        KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
+        KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
 
-        if (totalEndOffset > totalStartOffset) {
-            KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
-            KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
-            segment.setName(CubeSegment.makeSegmentName(0, 0, 
totalStartOffset, totalEndOffset));
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToUpdateSegs(segment);
-            try {
-                cubeManager.updateCube(cubeBuilder);
-            } catch (IOException e) {
-                return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
-            }
+        segment.setName(CubeSegment.makeSegmentName(0, 0, 
segment.getSourceOffsetStart(), segment.getSourceOffsetEnd()));
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToUpdateSegs(segment);
+        try {
+            cubeManager.updateCube(cubeBuilder);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } else {
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToRemoveSegs(segment);
-            try {
-                cubeManager.updateCube(cubeBuilder);
-            } catch (IOException e) {
-                return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
-            }
-
-            return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new 
message comes");
+        } catch (IOException e) {
+            logger.error("fail to update cube segment offset", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
         }
-
-
     }
 
 }

Reply via email to