[
https://issues.apache.org/jira/browse/HADOOP-18456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17606974#comment-17606974
]
ASF GitHub Bot commented on HADOOP-18456:
-----------------------------------------
mehakmeet commented on code in PR #4909:
URL: https://github.com/apache/hadoop/pull/4909#discussion_r974970318
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java:
##########
@@ -147,7 +206,7 @@ private void assertMapContainsKey(int key) {
.isTrue();
}
- /**
+ /**y
Review Comment:
nit: typo
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java:
##########
@@ -125,11 +128,67 @@ public void testDemandCreateEntries() {
}
+ /**
+ * It is an error to have a factory which returns null.
+ */
+ @Test
+ public void testFactoryReturningNull() throws Throwable {
+ referenceMap = new WeakReferenceMap<>(
+ (k) -> null,
+ null);
+ intercept(NullPointerException.class, () ->
+ referenceMap.get(0));
+ }
+
+ /**
+ * Test the WeakReferenceThreadMap extension.
+ */
+ @Test
+ public void testWeakReferenceThreadMapRejectsNullAssignment()
+ throws Throwable {
+ WeakReferenceThreadMap<String> threadMap = new WeakReferenceThreadMap<>(
+ id -> "Entry for thread ID " + id,
+ null);
+
+ Assertions.assertThat(threadMap.setForCurrentThread("hello"))
+ .describedAs("current thread map value on first set")
+ .isNull();
+
+ // second attempt returns itself
+ Assertions.assertThat(threadMap.setForCurrentThread("hello"))
Review Comment:
little doubt here: what happens if we set it to "hello2" this time, does the
set return "hello" or "hello2"?
Can you add the next assert to be of a different value than "hello", just to
confirm if set actually returns the previous set value?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java:
##########
@@ -132,35 +145,92 @@ public WeakReference<V> lookup(K key) {
* @return an instance.
*/
public V get(K key) {
- final WeakReference<V> current = lookup(key);
- V val = resolve(current);
- if (val != null) {
+ final WeakReference<V> currentWeakRef = lookup(key);
+ // resolve it, after which if not null, we have a strong reference
+ V strongVal = resolve(currentWeakRef);
+ if (strongVal != null) {
// all good.
- return val;
+ return strongVal;
}
- // here, either no ref, or the value is null
- if (current != null) {
+ // here, either currentWeakRef was null, or its reference was GC'd.
+ if (currentWeakRef != null) {
+ // garbage collection removed the reference.
+
+ // explicitly remove the weak ref from the map if it has not
+ // been updated by this point
+ // this is here just for completeness.
+ map.remove(key, currentWeakRef);
+
+ // log/report the loss.
noteLost(key);
}
+
+ // create a new value and add it to the map
return create(key);
}
/**
* Create a new instance under a key.
+ * <p>
* The instance is created, added to the map and then the
* map value retrieved.
* This ensures that the reference returned is that in the map,
* even if there is more than one entry being created at the same time.
+ * If that race does occur, it will be logged the first time it happens
+ * for this specific map instance.
+ * <p>
+ * HADOOP-18456 highlighted the risk of a concurrent GC resulting a null
+ * value being retrieved and so returned.
+ * To prevent this:
+ * <ol>
+ * <li>A strong reference is retained to the newly created instance
+ * in a local variable.</li>
+ * <li>That variable is used after the resolution process, to ensure
+ * the JVM doesn't consider it "unreachable" and so eligible for
GC.</li>
+ * <li>A check is made for the resolved reference being null, and if so,
+ * the put() is repeated</li>
+ * </ol>
* @param key key
- * @return the value
+ * @return the created value
*/
public V create(K key) {
entriesCreatedCount.incrementAndGet();
- WeakReference<V> newRef = new WeakReference<>(
- requireNonNull(factory.apply(key)));
- map.put(key, newRef);
- return map.get(key).get();
+ /*
+ Get a strong ref so even if a GC happens in this method the reference is
not lost.
+ It is NOT enough to have a reference in a field, it MUST be used
+ so as to ensure the reference isn't optimized away prematurely.
+ "A reachable object is any object that can be accessed in any potential
continuing
+ computation from any live thread."
+ */
+
+ final V strongRef = requireNonNull(factory.apply(key));
+ V resolvedStrongRef;
+ do {
+ WeakReference<V> newWeakRef = new WeakReference<>(strongRef);
+
+ // put it in the map
+ map.put(key, newWeakRef);
+
+ // get it back from the map
+ WeakReference<V> retrievedWeakRef = map.get(key);
+ // resolve that reference, handling the situation where somehow it was
removed from the map
+ // between the put() and the get()
+ resolvedStrongRef = resolve(retrievedWeakRef);
+ if (resolvedStrongRef == null) {
+ referenceLostDuringCreation.warn("reference to %s lost during
creation", key);
+ noteLost(key);
+ }
+ } while (resolvedStrongRef == null);
+
+ // note if there was any change in the reference.
+ // as this forces strongRef to be kept in scope
+ if (strongRef != resolvedStrongRef) {
+ LOG.debug("Created instance for key {}: {} overwritten by {}",
Review Comment:
if this is the case, shouldn't we raise an exception? Are we not returning
the wrong value then?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java:
##########
@@ -36,30 +38,55 @@ public WeakReferenceThreadMap(final Function<? super Long,
? extends V> factory,
super(factory, referenceLost);
}
+ /**
+ * Get the value for the current thread, creating if needed.
+ * @return an instance.
+ */
public V getForCurrentThread() {
return get(currentThreadId());
}
+ /**
+ * Remove the reference for the current thread.
+ * @return any reference value which existed.
+ */
public V removeForCurrentThread() {
return remove(currentThreadId());
}
+ /**
+ * Get the current thread ID.
+ * @return thread ID.
+ */
public long currentThreadId() {
return Thread.currentThread().getId();
}
+ /**
+ * Set the new value for the current thread.
+ * @param newVal new reference to set for the active thread.
+ * @return any old value, possibly null
Review Comment:
nit: "any old value", or should this be "previously set value"?
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java:
##########
@@ -125,11 +128,67 @@ public void testDemandCreateEntries() {
}
+ /**
+ * It is an error to have a factory which returns null.
+ */
+ @Test
+ public void testFactoryReturningNull() throws Throwable {
+ referenceMap = new WeakReferenceMap<>(
+ (k) -> null,
+ null);
+ intercept(NullPointerException.class, () ->
+ referenceMap.get(0));
+ }
+
+ /**
+ * Test the WeakReferenceThreadMap extension.
+ */
+ @Test
+ public void testWeakReferenceThreadMapRejectsNullAssignment()
+ throws Throwable {
+ WeakReferenceThreadMap<String> threadMap = new WeakReferenceThreadMap<>(
+ id -> "Entry for thread ID " + id,
+ null);
+
+ Assertions.assertThat(threadMap.setForCurrentThread("hello"))
+ .describedAs("current thread map value on first set")
+ .isNull();
+
+ // second attempt returns itself
+ Assertions.assertThat(threadMap.setForCurrentThread("hello"))
+ .describedAs("current thread map value on second set")
+ .isEqualTo("hello");
+
+ // it is forbidden to explictly set to null via the set() call.
Review Comment:
nit: typo "explictly"
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java:
##########
@@ -132,35 +145,92 @@ public WeakReference<V> lookup(K key) {
* @return an instance.
*/
public V get(K key) {
- final WeakReference<V> current = lookup(key);
- V val = resolve(current);
- if (val != null) {
+ final WeakReference<V> currentWeakRef = lookup(key);
+ // resolve it, after which if not null, we have a strong reference
+ V strongVal = resolve(currentWeakRef);
+ if (strongVal != null) {
// all good.
- return val;
+ return strongVal;
}
- // here, either no ref, or the value is null
- if (current != null) {
+ // here, either currentWeakRef was null, or its reference was GC'd.
+ if (currentWeakRef != null) {
+ // garbage collection removed the reference.
+
+ // explicitly remove the weak ref from the map if it has not
+ // been updated by this point
+ // this is here just for completeness.
+ map.remove(key, currentWeakRef);
+
+ // log/report the loss.
noteLost(key);
}
+
+ // create a new value and add it to the map
return create(key);
}
/**
* Create a new instance under a key.
+ * <p>
* The instance is created, added to the map and then the
* map value retrieved.
* This ensures that the reference returned is that in the map,
* even if there is more than one entry being created at the same time.
+ * If that race does occur, it will be logged the first time it happens
+ * for this specific map instance.
+ * <p>
+ * HADOOP-18456 highlighted the risk of a concurrent GC resulting a null
+ * value being retrieved and so returned.
+ * To prevent this:
+ * <ol>
+ * <li>A strong reference is retained to the newly created instance
+ * in a local variable.</li>
+ * <li>That variable is used after the resolution process, to ensure
+ * the JVM doesn't consider it "unreachable" and so eligible for
GC.</li>
+ * <li>A check is made for the resolved reference being null, and if so,
+ * the put() is repeated</li>
+ * </ol>
* @param key key
- * @return the value
+ * @return the created value
*/
public V create(K key) {
entriesCreatedCount.incrementAndGet();
- WeakReference<V> newRef = new WeakReference<>(
- requireNonNull(factory.apply(key)));
- map.put(key, newRef);
- return map.get(key).get();
+ /*
+ Get a strong ref so even if a GC happens in this method the reference is
not lost.
+ It is NOT enough to have a reference in a field, it MUST be used
+ so as to ensure the reference isn't optimized away prematurely.
+ "A reachable object is any object that can be accessed in any potential
continuing
+ computation from any live thread."
+ */
+
+ final V strongRef = requireNonNull(factory.apply(key));
Review Comment:
Message for a factory returning null instance.
> NullPointerException in ObjectListingIterator's constructor
> -----------------------------------------------------------
>
> Key: HADOOP-18456
> URL: https://issues.apache.org/jira/browse/HADOOP-18456
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs/s3
> Affects Versions: 3.3.9
> Reporter: Quanlong Huang
> Assignee: Steve Loughran
> Priority: Blocker
> Labels: pull-request-available
>
> We saw NullPointerExceptions in Impala's S3 tests: IMPALA-11592. It's thrown
> from the hadoop jar:
> {noformat}
> Caused by: java.lang.NullPointerException
> at
> org.apache.hadoop.fs.s3a.Listing$ObjectListingIterator.<init>(Listing.java:621)
> at
> org.apache.hadoop.fs.s3a.Listing.createObjectListingIterator(Listing.java:163)
> at
> org.apache.hadoop.fs.s3a.Listing.createFileStatusListingIterator(Listing.java:144)
> at
> org.apache.hadoop.fs.s3a.Listing.getListFilesAssumingDir(Listing.java:212)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerListFiles(S3AFileSystem.java:4790)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listFiles$37(S3AFileSystem.java:4732)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2363)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2382)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.listFiles(S3AFileSystem.java:4731)
> at
> org.apache.impala.common.FileSystemUtil.listFiles(FileSystemUtil.java:754)
> ... {noformat}
> We are using a private build of the hadoop jar. Version: CDP
> 3.1.1.7.2.16.0-164
> Code snipper of where the NPE throws:
> {code:java}
> 604 @Retries.RetryRaw
> 605 ObjectListingIterator(
> 606 Path listPath,
> 607 S3ListRequest request,
> 608 AuditSpan span) throws IOException {
> 609 this.listPath = listPath;
> 610 this.maxKeys = listingOperationCallbacks.getMaxKeys();
> 611 this.request = request;
> 612 this.objectsPrev = null;
> 613 this.iostats = iostatisticsStore()
> 614 .withDurationTracking(OBJECT_LIST_REQUEST)
> 615 .withDurationTracking(OBJECT_CONTINUE_LIST_REQUEST)
> 616 .build();
> 617 this.span = span;
> 618 this.s3ListResultFuture = listingOperationCallbacks
> 619 .listObjectsAsync(request, iostats, span);
> 620 this.aggregator =
> IOStatisticsContext.getCurrentIOStatisticsContext()
> 621 .getAggregator(); // <---- thrown here
> 622 }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]