zentol commented on a change in pull request #14372:
URL: https://github.com/apache/flink/pull/14372#discussion_r542643500
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
##########
@@ -128,12 +129,11 @@
*
* <p>The release hook is executed just before the user code class
loader is being released.
* Registration only happens if no hook has been registered under this
name already.
- *
Review comment:
revert
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
}
}
}
+
+ /**
+ * Remove references created by the producer, preventing the
classloader to unload. References were
+ * analyzed as of version 0.14.0.
+ */
+ private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+ // unregister admin mbean
+ AwsSdkMetrics.unregisterMetricAdminMBean();
+
+ try {
+ // Remove FileAgeManager
+ Class<?> fileAgeManagerClazz =
Class.forName("com.amazonaws.services.kinesis.producer.FileAgeManager", true,
classLoader);
Review comment:
Shouldn't we be able to use `getClass().getClassLoader()`? Then we
wouldn't have to modify the `RuntimeContext` API.
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
}
}
}
+
+ /**
+ * Remove references created by the producer, preventing the
classloader to unload. References were
+ * analyzed as of version 0.14.0.
+ */
+ private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+ // unregister admin mbean
Review comment:
```suggestion
```
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
}
}
}
+
+ /**
+ * Remove references created by the producer, preventing the
classloader to unload. References were
+ * analyzed as of version 0.14.0.
Review comment:
It would be good to add a version reference for which
`aws-java-sdk-core` was used.
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
}
}
}
+
+ /**
+ * Remove references created by the producer, preventing the
classloader to unload. References were
+ * analyzed as of version 0.14.0.
+ */
+ private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+ // unregister admin mbean
+ AwsSdkMetrics.unregisterMetricAdminMBean();
+
+ try {
+ // Remove FileAgeManager
+ Class<?> fileAgeManagerClazz =
Class.forName("com.amazonaws.services.kinesis.producer.FileAgeManager", true,
classLoader);
+ Field instanceField =
fileAgeManagerClazz.getDeclaredField("instance");
+ instanceField.setAccessible(true);
+
+ // unset (static final) field FileAgeManager.instance
+ Field modifiersField =
Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(instanceField,
instanceField.getModifiers() & ~Modifier.FINAL);
+ Object fileAgeManager = instanceField.get(null);
+ instanceField.set(null, null);
+
+ // shutdown thread pool
Review comment:
This should be the key change necessary to ensure the ClassLoader can be
cleaned up.
We shouldn't have to touch the FileAgeManager#instance reference; so long as
no thread has references to them we should be good.
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
}
}
}
+
+ /**
+ * Remove references created by the producer, preventing the
classloader to unload. References were
+ * analyzed as of version 0.14.0.
+ */
+ private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+ // unregister admin mbean
+ AwsSdkMetrics.unregisterMetricAdminMBean();
+
+ try {
+ // Remove FileAgeManager
+ Class<?> fileAgeManagerClazz =
Class.forName("com.amazonaws.services.kinesis.producer.FileAgeManager", true,
classLoader);
+ Field instanceField =
fileAgeManagerClazz.getDeclaredField("instance");
+ instanceField.setAccessible(true);
+
+ // unset (static final) field FileAgeManager.instance
+ Field modifiersField =
Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(instanceField,
instanceField.getModifiers() & ~Modifier.FINAL);
+ Object fileAgeManager = instanceField.get(null);
+ instanceField.set(null, null);
+
+ // shutdown thread pool
+ Field executorField =
fileAgeManagerClazz.getDeclaredField("executorService");
+ executorField.setAccessible(true);
+ ExecutorService executorService = (ExecutorService)
executorField.get(fileAgeManager);
+ executorService.shutdown();
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
+
+ // Remove InstanceProfileCredentialsProvider
+ Class<?> credProviderClazz =
Class.forName("com.amazonaws.auth.InstanceProfileCredentialsProvider", true,
classLoader);
Review comment:
Why are we clearing this singleton?
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
}
}
}
+
+ /**
+ * Remove references created by the producer, preventing the
classloader to unload. References were
+ * analyzed as of version 0.14.0.
+ */
+ private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+ // unregister admin mbean
+ AwsSdkMetrics.unregisterMetricAdminMBean();
+
+ try {
+ // Remove FileAgeManager
Review comment:
seems outdated?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]