zentol commented on a change in pull request #14372:
URL: https://github.com/apache/flink/pull/14372#discussion_r542643500



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
##########
@@ -128,12 +129,11 @@
         *
         * <p>The release hook is executed just before the user code class 
loader is being released.
         * Registration only happens if no hook has been registered under this 
name already.
-        *

Review comment:
       revert

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
                        }
                }
        }
+
+       /**
+        * Remove references created by the producer, preventing the 
classloader to unload. References were
+        * analyzed as of version 0.14.0.
+        */
+       private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+               // unregister admin mbean
+               AwsSdkMetrics.unregisterMetricAdminMBean();
+
+               try {
+                       // Remove FileAgeManager
+                       Class<?> fileAgeManagerClazz = 
Class.forName("com.amazonaws.services.kinesis.producer.FileAgeManager", true, 
classLoader);

Review comment:
       Shouldn't we be able to use `getClass().getClassLoader()`? Then we 
wouldn't have to modify the `RuntimeContext` API.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
                        }
                }
        }
+
+       /**
+        * Remove references created by the producer, preventing the 
classloader to unload. References were
+        * analyzed as of version 0.14.0.
+        */
+       private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+               // unregister admin mbean

Review comment:
       ```suggestion
   ```

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
                        }
                }
        }
+
+       /**
+        * Remove references created by the producer, preventing the 
classloader to unload. References were
+        * analyzed as of version 0.14.0.

Review comment:
       It would be good to add a version reference for which 
`aws-java-sdk-core` was used.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
                        }
                }
        }
+
+       /**
+        * Remove references created by the producer, preventing the 
classloader to unload. References were
+        * analyzed as of version 0.14.0.
+        */
+       private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+               // unregister admin mbean
+               AwsSdkMetrics.unregisterMetricAdminMBean();
+
+               try {
+                       // Remove FileAgeManager
+                       Class<?> fileAgeManagerClazz = 
Class.forName("com.amazonaws.services.kinesis.producer.FileAgeManager", true, 
classLoader);
+                       Field instanceField = 
fileAgeManagerClazz.getDeclaredField("instance");
+                       instanceField.setAccessible(true);
+
+                       // unset (static final) field FileAgeManager.instance
+                       Field modifiersField = 
Field.class.getDeclaredField("modifiers");
+                       modifiersField.setAccessible(true);
+                       modifiersField.setInt(instanceField, 
instanceField.getModifiers() & ~Modifier.FINAL);
+                       Object fileAgeManager = instanceField.get(null);
+                       instanceField.set(null, null);
+
+                       // shutdown thread pool

Review comment:
       This should be the key change necessary to ensure the ClassLoader can be 
cleaned up.
   We shouldn't have to touch the FileAgeManager#instance reference; so long as 
no thread has references to them we should be good.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
                        }
                }
        }
+
+       /**
+        * Remove references created by the producer, preventing the 
classloader to unload. References were
+        * analyzed as of version 0.14.0.
+        */
+       private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+               // unregister admin mbean
+               AwsSdkMetrics.unregisterMetricAdminMBean();
+
+               try {
+                       // Remove FileAgeManager
+                       Class<?> fileAgeManagerClazz = 
Class.forName("com.amazonaws.services.kinesis.producer.FileAgeManager", true, 
classLoader);
+                       Field instanceField = 
fileAgeManagerClazz.getDeclaredField("instance");
+                       instanceField.setAccessible(true);
+
+                       // unset (static final) field FileAgeManager.instance
+                       Field modifiersField = 
Field.class.getDeclaredField("modifiers");
+                       modifiersField.setAccessible(true);
+                       modifiersField.setInt(instanceField, 
instanceField.getModifiers() & ~Modifier.FINAL);
+                       Object fileAgeManager = instanceField.get(null);
+                       instanceField.set(null, null);
+
+                       // shutdown thread pool
+                       Field executorField = 
fileAgeManagerClazz.getDeclaredField("executorService");
+                       executorField.setAccessible(true);
+                       ExecutorService executorService = (ExecutorService) 
executorField.get(fileAgeManager);
+                       executorService.shutdown();
+                       executorService.awaitTermination(1, TimeUnit.MINUTES);
+
+                       // Remove InstanceProfileCredentialsProvider
+                       Class<?> credProviderClazz = 
Class.forName("com.amazonaws.auth.InstanceProfileCredentialsProvider", true, 
classLoader);

Review comment:
       Why are we clearing this singleton?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
                        }
                }
        }
+
+       /**
+        * Remove references created by the producer, preventing the 
classloader to unload. References were
+        * analyzed as of version 0.14.0.
+        */
+       private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+               // unregister admin mbean
+               AwsSdkMetrics.unregisterMetricAdminMBean();
+
+               try {
+                       // Remove FileAgeManager

Review comment:
       seems outdated?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to