[hotfix] Replace registerTimer/getTime by TimeServiceProvider in Context

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51a5048b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51a5048b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51a5048b

Branch: refs/heads/master
Commit: 51a5048b24ffe7655e2197c04aa844239bf1af83
Parents: ffff299
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri Sep 23 10:40:16 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Sep 23 15:01:07 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  | 23 +++++++------
 .../kafka/internals/AbstractFetcher.java        | 19 ++++++-----
 .../kafka/testutils/MockRuntimeContext.java     | 11 ++----
 .../api/operators/StreamingRuntimeContext.java  |  7 ++--
 .../runtime/tasks/AsyncExceptionHandler.java    |  8 ++---
 .../tasks/DefaultTimeServiceProvider.java       | 35 +++++++++-----------
 .../streaming/runtime/tasks/StreamTask.java     |  6 ++--
 .../runtime/operators/TimeProviderTest.java     | 13 ++++++--
 .../util/OneInputStreamOperatorTestHarness.java |  2 +-
 9 files changed, 64 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 1e05c0d..5a5cade 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -284,6 +285,8 @@ public class BucketingSink<T>
 
        private transient Clock clock;
 
+       private transient TimeServiceProvider processingTimeService;
+
        /**
         * Creates a new {@code BucketingSink} that writes files to the given 
base directory.
         *
@@ -320,18 +323,19 @@ public class BucketingSink<T>
                FileSystem fs = baseDirectory.getFileSystem(hadoopConf);
                refTruncate = reflectTruncate(fs);
 
-               long currentProcessingTime =
-                               ((StreamingRuntimeContext) 
getRuntimeContext()).getCurrentProcessingTime();
+               processingTimeService =
+                               ((StreamingRuntimeContext) 
getRuntimeContext()).getTimeServiceProvider();
+
+               long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
 
                checkForInactiveBuckets(currentProcessingTime);
 
-               ((StreamingRuntimeContext) getRuntimeContext()).registerTimer(
-                               currentProcessingTime + 
inactiveBucketCheckInterval, this);
+               processingTimeService.registerTimer(currentProcessingTime + 
inactiveBucketCheckInterval, this);
 
                this.clock = new Clock() {
                        @Override
                        public long currentTimeMillis() {
-                               return ((StreamingRuntimeContext) 
getRuntimeContext()).getCurrentProcessingTime();
+                               return 
processingTimeService.getCurrentProcessingTime();
                        }
                };
 
@@ -376,8 +380,7 @@ public class BucketingSink<T>
        public void invoke(T value) throws Exception {
                Path bucketPath = bucketer.getBucketPath(clock, new 
Path(basePath), value);
 
-               long currentProcessingTime =
-                               ((StreamingRuntimeContext) 
getRuntimeContext()).getCurrentProcessingTime();
+               long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
 
                if (!state.hasBucketState(bucketPath)) {
                        state.addBucketState(bucketPath, new 
BucketState<T>(currentProcessingTime));
@@ -420,13 +423,11 @@ public class BucketingSink<T>
 
        @Override
        public void trigger(long timestamp) throws Exception {
-               long currentProcessingTime =
-                               ((StreamingRuntimeContext) 
getRuntimeContext()).getCurrentProcessingTime();
+               long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
 
                checkForInactiveBuckets(currentProcessingTime);
 
-               ((StreamingRuntimeContext) getRuntimeContext()).registerTimer(
-                               currentProcessingTime + 
inactiveBucketCheckInterval, this);
+               processingTimeService.registerTimer(currentProcessingTime + 
inactiveBucketCheckInterval, this);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 8ec26cc..9255445 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -80,7 +81,8 @@ public abstract class AbstractFetcher<T, KPH> {
                        List<KafkaTopicPartition> assignedPartitions,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       StreamingRuntimeContext runtimeContext, boolean 
useMetrics) throws Exception
+                       StreamingRuntimeContext runtimeContext,
+                       boolean useMetrics) throws Exception
        {
                this.sourceContext = checkNotNull(sourceContext);
                this.checkpointLock = sourceContext.getCheckpointLock();
@@ -116,7 +118,7 @@ public abstract class AbstractFetcher<T, KPH> {
                                        
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
                        
                        PeriodicWatermarkEmitter periodicEmitter = 
-                                       new PeriodicWatermarkEmitter(parts, 
sourceContext, runtimeContext);
+                                       new PeriodicWatermarkEmitter(parts, 
sourceContext, runtimeContext.getTimeServiceProvider(), 
runtimeContext.getExecutionConfig().getAutoWatermarkInterval());
                        periodicEmitter.start();
                }
        }
@@ -458,7 +460,7 @@ public abstract class AbstractFetcher<T, KPH> {
                
                private final SourceContext<?> emitter;
                
-               private final StreamingRuntimeContext triggerContext;
+               private final TimeServiceProvider timerService;
 
                private final long interval;
                
@@ -469,19 +471,20 @@ public abstract class AbstractFetcher<T, KPH> {
                PeriodicWatermarkEmitter(
                                
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
                                SourceContext<?> emitter,
-                               StreamingRuntimeContext runtimeContext)
+                               TimeServiceProvider timerService,
+                               long autoWatermarkInterval)
                {
                        this.allPartitions = checkNotNull(allPartitions);
                        this.emitter = checkNotNull(emitter);
-                       this.triggerContext = checkNotNull(runtimeContext);
-                       this.interval = 
runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+                       this.timerService = checkNotNull(timerService);
+                       this.interval = autoWatermarkInterval;
                        this.lastWatermarkTimestamp = Long.MIN_VALUE;
                }
 
                //-------------------------------------------------
                
                public void start() {
-                       
triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + 
interval, this);
+                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
                }
                
                @Override
@@ -510,7 +513,7 @@ public abstract class AbstractFetcher<T, KPH> {
                        }
                        
                        // schedule the next watermark
-                       
triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + 
interval, this);
+                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 7a50569..da2c652 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -198,15 +198,8 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
        }
 
        @Override
-       public long getCurrentProcessingTime() {
-               Preconditions.checkNotNull(timerService, "The processing time 
timer has not been initialized.");
-               return timerService.getCurrentProcessingTime();
-       }
-
-       @Override
-       public ScheduledFuture<?> registerTimer(final long time, final 
Triggerable target) {
-               Preconditions.checkNotNull(timerService, "The processing time 
timer has not been initialized.");
-               return timerService.registerTimer(time, target);
+       public TimeServiceProvider getTimeServiceProvider() {
+               return timerService;
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 961bd9d..4f85e3a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -35,11 +35,10 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ScheduledFuture;
 
 import static java.util.Objects.requireNonNull;
 
@@ -83,6 +82,10 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
                return taskEnvironment.getInputSplitProvider();
        }
 
+       public TimeServiceProvider getTimeServiceProvider() {
+               return operator.getTimerService();
+       }
+
        // 
------------------------------------------------------------------------
        //  broadcast variables
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
index 85a4115..c7ec2ed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -18,14 +18,12 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 /**
- * An interface marking a task as capable to register exceptions thrown by 
different
- * threads, other than the one executing the taks itself.
+ * Interface for reporting exceptions that are thrown in (possibly) a 
different thread.
  */
 public interface AsyncExceptionHandler {
 
        /**
-        * Registers to the main thread an exception that was thrown by another 
thread
-        * (e.g. a TriggerTask), other than the one executing the main task.
+        * Registers the given exception.
         */
-       void registerAsyncException(String message, AsynchronousException 
exception);
+       void registerAsyncException(AsynchronousException exception);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index c7339b3..ea2b07f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -39,10 +39,11 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
        /** The executor service that schedules and calls the triggers of this 
task*/
        private final ScheduledExecutorService timerService;
 
-       public static DefaultTimeServiceProvider create(AsyncExceptionHandler 
task,
-                                                                               
                        ScheduledExecutorService executor,
-                                                                               
                        Object checkpointLock) {
-               return new DefaultTimeServiceProvider(task, executor, 
checkpointLock);
+       public static DefaultTimeServiceProvider create(
+                       AsyncExceptionHandler exceptionHandler,
+                       ScheduledExecutorService executor,
+                       Object checkpointLock) {
+               return new DefaultTimeServiceProvider(exceptionHandler, 
executor, checkpointLock);
        }
 
        private DefaultTimeServiceProvider(AsyncExceptionHandler task,
@@ -82,10 +83,10 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
                private final Object lock;
                private final Triggerable target;
                private final long timestamp;
-               private final AsyncExceptionHandler task;
+               private final AsyncExceptionHandler exceptionHandler;
 
-               TriggerTask(AsyncExceptionHandler task, final Object lock, 
Triggerable target, long timestamp) {
-                       this.task = task;
+               TriggerTask(AsyncExceptionHandler exceptionHandler, final 
Object lock, Triggerable target, long timestamp) {
+                       this.exceptionHandler = exceptionHandler;
                        this.lock = lock;
                        this.target = target;
                        this.timestamp = timestamp;
@@ -97,17 +98,8 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
                                try {
                                        target.trigger(timestamp);
                                } catch (Throwable t) {
-
-                                       if (task != null) {
-                                               // registers the exception with 
the calling task
-                                               // so that it can be logged and 
(later) detected
-                                               TimerException asyncException = 
new TimerException(t);
-                                               
task.registerAsyncException("Caught exception while processing timer.", 
asyncException);
-                                       } else {
-                                               // this is for when we are in 
testing mode and we
-                                               // want to have real processing 
time.
-                                               t.printStackTrace();
-                                       }
+                                       TimerException asyncException = new 
TimerException(t);
+                                       
exceptionHandler.registerAsyncException(asyncException);
                                }
                        }
                }
@@ -115,6 +107,11 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
 
        @VisibleForTesting
        public static DefaultTimeServiceProvider 
createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
-               return new DefaultTimeServiceProvider(null, executor, 
checkpointLock);
+               return new DefaultTimeServiceProvider(new 
AsyncExceptionHandler() {
+                       @Override
+                       public void 
registerAsyncException(AsynchronousException exception) {
+                               exception.printStackTrace();
+                       }
+               }, executor, checkpointLock);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index faa9672..ff074b7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -838,9 +838,9 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        }
 
        @Override
-       public void registerAsyncException(String message, 
AsynchronousException exception) {
+       public void registerAsyncException(AsynchronousException exception) {
                if (isRunning) {
-                       LOG.error(message, exception);
+                       LOG.error("Asynchronous exception registered.", 
exception);
                }
 
                if (this.asyncException == null) {
@@ -940,7 +940,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
 
                                // registers the exception and tries to fail 
the whole task
                                AsynchronousException asyncException = new 
AsynchronousException(e);
-                               owner.registerAsyncException("Caught exception 
while materializing asynchronous checkpoints.", asyncException);
+                               owner.registerAsyncException(asyncException);
                        }
                        finally {
                                synchronized (cancelables) {

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
index 140e9e2..60850d8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
+import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
@@ -50,8 +52,15 @@ public class TimeProviderTest {
                final OneShotLatch latch = new OneShotLatch();
 
                final Object lock = new Object();
-               TimeServiceProvider timeServiceProvider = 
DefaultTimeServiceProvider
-                       
.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+               TimeServiceProvider timeServiceProvider = 
DefaultTimeServiceProvider.create(
+                               new AsyncExceptionHandler() {
+                                       @Override
+                                       public void 
registerAsyncException(AsynchronousException exception) {
+                                               exception.printStackTrace();
+                                       }
+                               },
+                               Executors.newSingleThreadScheduledExecutor(),
+                               lock);
 
                final List<Long> timestamps = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 9cdc783..acf046a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -135,7 +135,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                                // do nothing
                                return null;
                        }
-               }).when(mockTask).registerAsyncException(any(String.class), 
any(AsynchronousException.class));
+               
}).when(mockTask).registerAsyncException(any(AsynchronousException.class));
 
                try {
                        doAnswer(new Answer<CheckpointStreamFactory>() {

Reply via email to