AHeise commented on a change in pull request #16399:
URL: https://github.com/apache/flink/pull/16399#discussion_r682820877



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractSinkWriterOperator.java
##########
@@ -145,29 +150,60 @@ public Long timestamp() {
 
     private static class InitContextImpl implements Sink.InitContext {
 
-        private final int subtaskIdx;
-
         private final ProcessingTimeService processingTimeService;
 
+        private final MailboxExecutor mailboxExecutor;
+
         private final MetricGroup metricGroup;
 
+        private final StreamingRuntimeContext runtimeContext;
+
         public InitContextImpl(
-                int subtaskIdx,
+                StreamingRuntimeContext runtimeContext,
                 ProcessingTimeService processingTimeService,
+                MailboxExecutor mailboxExecutor,
                 MetricGroup metricGroup) {
-            this.subtaskIdx = subtaskIdx;
+            this.runtimeContext = checkNotNull(runtimeContext);
+            this.mailboxExecutor = checkNotNull(mailboxExecutor);
             this.processingTimeService = checkNotNull(processingTimeService);
             this.metricGroup = checkNotNull(metricGroup);
         }
 
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return new UserCodeClassLoader() {
+                @Override
+                public ClassLoader asClassLoader() {
+                    return runtimeContext.getUserCodeClassLoader();
+                }
+
+                @Override
+                public void registerReleaseHookIfAbsent(
+                        String releaseHookName, Runnable releaseHook) {
+                    
runtimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent(
+                            releaseHookName, releaseHook);
+                }
+            };
+        }
+
+        @Override
+        public int getNumberOfParallelSubtasks() {
+            return runtimeContext.getNumberOfParallelSubtasks();
+        }
+
+        @Override
+        public MailboxExecutor getMailboxExecutor() {
+            return mailboxExecutor;
+        }
+
         @Override
         public Sink.ProcessingTimeService getProcessingTimeService() {
             return new ProcessingTimerServiceImpl(processingTimeService);
         }
 
         @Override
         public int getSubtaskId() {
-            return subtaskIdx;
+            return runtimeContext.getIndexOfThisSubtask();

Review comment:
       No they cannot change. This information is ultimately coming from 
`TaskInfo` that's bound to the internal runtime `Environment`.
   Even if it would change, it would actually be better to return that to the 
`Sink`, no? Then the sink can decide if it wants to cache the value (which it 
should in pretty much all cases) or not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to