[hotfix] Expose AllocationID as string through TaskInfo

(cherry picked from commit edece9c)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbe3cbfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbe3cbfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbe3cbfa

Branch: refs/heads/release-1.5
Commit: fbe3cbfae9d08f8e5f9c21f0f075944b239cb50c
Parents: eba462e
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Tue Mar 13 13:53:48 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu May 17 10:07:52 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/common/TaskInfo.java   | 36 ++++++++++++++++++--
 .../util/AbstractRuntimeUDFContext.java         |  7 ++++
 .../apache/flink/runtime/taskmanager/Task.java  |  3 +-
 3 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fbe3cbfa/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 33f2e0c..2583687 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common;
 
 import org.apache.flink.annotation.Internal;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Encapsulates task-specific information: name, index of subtask, parallelism 
and attempt number.
@@ -31,12 +31,35 @@ public class TaskInfo {
 
        private final String taskName;
        private final String taskNameWithSubtasks;
+       private final String allocationIDAsString;
        private final int maxNumberOfParallelSubtasks;
        private final int indexOfSubtask;
        private final int numberOfParallelSubtasks;
        private final int attemptNumber;
 
-       public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int 
indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+       public TaskInfo(
+               String taskName,
+               int maxNumberOfParallelSubtasks,
+               int indexOfSubtask,
+               int numberOfParallelSubtasks,
+               int attemptNumber) {
+               this(
+                       taskName,
+                       maxNumberOfParallelSubtasks,
+                       indexOfSubtask,
+                       numberOfParallelSubtasks,
+                       attemptNumber,
+                       "UNKNOWN");
+       }
+
+       public TaskInfo(
+               String taskName,
+               int maxNumberOfParallelSubtasks,
+               int indexOfSubtask,
+               int numberOfParallelSubtasks,
+               int attemptNumber,
+               String allocationIDAsString) {
+
                checkArgument(indexOfSubtask >= 0, "Task index must be a 
non-negative number.");
                checkArgument(maxNumberOfParallelSubtasks >= 1, "Max 
parallelism must be a positive number.");
                checkArgument(maxNumberOfParallelSubtasks >= 
numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
@@ -49,6 +72,7 @@ public class TaskInfo {
                this.numberOfParallelSubtasks = numberOfParallelSubtasks;
                this.attemptNumber = attemptNumber;
                this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 
1) + '/' + numberOfParallelSubtasks + ')';
+               this.allocationIDAsString = checkNotNull(allocationIDAsString);
        }
 
        /**
@@ -107,4 +131,12 @@ public class TaskInfo {
        public String getTaskNameWithSubtasks() {
                return this.taskNameWithSubtasks;
        }
+
+       /**
+        * Returns the allocation id for where this task is executed.
+        * @return the allocation id for where this task is executed.
+        */
+       public String getAllocationIDAsString() {
+               return allocationIDAsString;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fbe3cbfa/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6246e80..d6262c7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.functions.util;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -239,4 +240,10 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
        }
+
+       @Internal
+       @VisibleForTesting
+       public String getAllocationIDAsString() {
+               return taskInfo.getAllocationIDAsString();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fbe3cbfa/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d99472b..43e5c82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -310,7 +310,8 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
                                taskInformation.getMaxNumberOfSubtaks(),
                                subtaskIndex,
                                taskInformation.getNumberOfSubtasks(),
-                               attemptNumber);
+                               attemptNumber,
+                               String.valueOf(slotAllocationId));
 
                this.jobId = jobInformation.getJobId();
                this.vertexId = taskInformation.getJobVertexId();

Reply via email to