This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch GEODE-6143-offheap-02 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 5ae31eea5e321c5d25a404cd143e432abffff68c Author: Kirk Lund <kl...@apache.org> AuthorDate: Mon Mar 22 15:54:01 2021 -0700 Setup for delegate --- .../internal/cache/AbstractBucketRegionQueue.java | 4 +- .../geode/internal/cache/AbstractRegionMap.java | 2 +- .../geode/internal/cache/BucketRegionQueue.java | 4 +- .../cache/wan/serial/SerialGatewaySenderQueue.java | 4 +- .../internal/offheap/OffHeapRegionEntryHelper.java | 104 ++++++++++++--------- .../offheap/OffHeapRegionEntryHelperInstance.java | 18 ++++ .../concurrent/CustomEntryConcurrentHashMap.java | 4 +- .../offheap/OffHeapRegionEntryHelperJUnitTest.java | 8 +- .../ParallelGatewaySenderOperationsDUnitTest.java | 2 +- 9 files changed, 92 insertions(+), 58 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java index 0813eab..236342a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java @@ -401,7 +401,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { @Override public void closeEntries() { - OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { + OffHeapRegionEntryHelper.OffHeapClear.doWithOffHeapClear(new Runnable() { @Override public void run() { AbstractBucketRegionQueue.super.closeEntries(); @@ -414,7 +414,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { @Override public Set<VersionSource> clearEntries(final RegionVersionVector rvv) { final AtomicReference<Set<VersionSource>> result = new AtomicReference<Set<VersionSource>>(); - OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { + OffHeapRegionEntryHelper.OffHeapClear.doWithOffHeapClear(new Runnable() { @Override public void run() { result.set(AbstractBucketRegionQueue.super.clearEntries(rvv)); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index 9a0d8b6..2238d15 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -433,7 +433,7 @@ public abstract class AbstractRegionMap extends BaseRegionMap boolean tombstone = re.isTombstone(); // note: it.remove() did not reliably remove the entry so we use remove(K,V) here if (getEntryMap().remove(re.getKey(), re)) { - if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) { + if (OffHeapRegionEntryHelper.OffHeapClear.doesClearNeedToCheckForOffHeap()) { GatewaySenderEventImpl.release(re.getValue()); // OFFHEAP _getValue ok } // If this is an overflow only region, we need to free the entry on diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index f9d5ab7..5de47d3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -208,7 +208,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { @Override public void closeEntries() { - OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { + OffHeapRegionEntryHelper.OffHeapClear.doWithOffHeapClear(new Runnable() { @Override public void run() { BucketRegionQueue.super.closeEntries(); @@ -221,7 +221,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { @Override public Set<VersionSource> clearEntries(final RegionVersionVector rvv) { final AtomicReference<Set<VersionSource>> result = new AtomicReference<Set<VersionSource>>(); - OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { + OffHeapRegionEntryHelper.OffHeapClear.doWithOffHeapClear(new Runnable() { @Override public void run() { result.set(BucketRegionQueue.super.clearEntries(rvv)); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 45f4f84..2cc7b34 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -1364,7 +1364,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { @Override public void closeEntries() { - OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { + OffHeapRegionEntryHelper.OffHeapClear.doWithOffHeapClear(new Runnable() { @Override public void run() { SerialGatewaySenderQueueMetaRegion.super.closeEntries(); @@ -1375,7 +1375,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { @Override public Set<VersionSource> clearEntries(final RegionVersionVector rvv) { final AtomicReference<Set<VersionSource>> result = new AtomicReference<Set<VersionSource>>(); - OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { + OffHeapRegionEntryHelper.OffHeapClear.doWithOffHeapClear(new Runnable() { @Override public void run() { result.set(SerialGatewaySenderQueueMetaRegion.super.clearEntries(rvv)); diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java index 0d10311..cef79cb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java +++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java @@ -47,33 +47,13 @@ public class OffHeapRegionEntryHelper { protected static final long TOMBSTONE_ADDRESS = 8L << 1; public static final int MAX_LENGTH_FOR_DATA_AS_ADDRESS = 8; - @Immutable - private static final Token[] addrToObj = - new Token[] {null, Token.INVALID, Token.LOCAL_INVALID, Token.DESTROYED, Token.REMOVED_PHASE1, - Token.REMOVED_PHASE2, Token.END_OF_STREAM, Token.NOT_AVAILABLE, Token.TOMBSTONE,}; - - private static long objectToAddress(@Unretained Object v) { - if (v instanceof StoredObject) - return ((StoredObject) v).getAddress(); - if (v == null) - return NULL_ADDRESS; - if (v == Token.TOMBSTONE) - return TOMBSTONE_ADDRESS; - if (v == Token.INVALID) - return INVALID_ADDRESS; - if (v == Token.LOCAL_INVALID) - return LOCAL_INVALID_ADDRESS; - if (v == Token.DESTROYED) - return DESTROYED_ADDRESS; - if (v == Token.REMOVED_PHASE1) - return REMOVED_PHASE1_ADDRESS; - if (v == Token.REMOVED_PHASE2) - return REMOVED_PHASE2_ADDRESS; - if (v == Token.END_OF_STREAM) - return END_OF_STREAM_ADDRESS; - if (v == Token.NOT_AVAILABLE) - return NOT_AVAILABLE_ADDRESS; - throw new IllegalStateException("Can not convert " + v + " to an off heap address."); + private static final OffHeapRegionEntryHelper INSTANCE = new OffHeapRegionEntryHelper( + new OffHeapRegionEntryHelperInstance()); + + private final OffHeapRegionEntryHelperInstance delegate; + + public OffHeapRegionEntryHelper(OffHeapRegionEntryHelperInstance delegate) { + this.delegate = delegate; } /** @@ -132,7 +112,7 @@ public class OffHeapRegionEntryHelper { } return result; } else { - return addrToObj[(int) ohAddress >> 1]; + return TokenAddress.addrToObj[(int) ohAddress >> 1]; } } @@ -159,7 +139,7 @@ public class OffHeapRegionEntryHelper { if (isOffHeap(ohAddress) || (ohAddress & ENCODED_BIT) != 0) { return Token.NOT_A_TOKEN; } else { - return addrToObj[(int) ohAddress >> 1]; + return TokenAddress.addrToObj[(int) ohAddress >> 1]; } } @@ -191,8 +171,8 @@ public class OffHeapRegionEntryHelper { public static void releaseEntry(@Unretained OffHeapRegionEntry re, @Released StoredObject expectedValue) { - long oldAddress = objectToAddress(expectedValue); - final long newAddress = objectToAddress(Token.REMOVED_PHASE2); + long oldAddress = TokenAddress.objectToAddress(expectedValue); + final long newAddress = TokenAddress.objectToAddress(Token.REMOVED_PHASE2); if (re.setAddress(oldAddress, newAddress)) { releaseAddress(oldAddress); } /* @@ -341,7 +321,7 @@ public class OffHeapRegionEntryHelper { public static void setValue(@Released OffHeapRegionEntry re, @Unretained Object v) { // setValue is called when synced so I don't need to worry // about oldAddress being released by someone else. - final long newAddress = objectToAddress(v); + final long newAddress = TokenAddress.objectToAddress(v); long oldAddress; do { oldAddress = re.getAddress(); @@ -367,7 +347,7 @@ public class OffHeapRegionEntryHelper { if (addr < 0) return true; addr >>= 1; // shift right 1 to convert to array index; - return addr >= addrToObj.length; + return addr >= TokenAddress.addrToObj.length; } /** @@ -414,8 +394,6 @@ public class OffHeapRegionEntryHelper { return addressToObject(addr, decompress, context); } - - public static boolean isSerialized(long address) { return (address & SERIALIZED_BIT) != 0; } @@ -424,18 +402,56 @@ public class OffHeapRegionEntryHelper { return (address & COMPRESSED_BIT) != 0; } - private static final ThreadLocal<Object> clearNeedsToCheckForOffHeap = new ThreadLocal<Object>(); + public static class OffHeapClear { + + private static final ThreadLocal<Object> clearNeedsToCheckForOffHeap = + new ThreadLocal<Object>(); + + public static boolean doesClearNeedToCheckForOffHeap() { + return clearNeedsToCheckForOffHeap.get() != null; + } - public static boolean doesClearNeedToCheckForOffHeap() { - return clearNeedsToCheckForOffHeap.get() != null; + public static void doWithOffHeapClear(Runnable r) { + clearNeedsToCheckForOffHeap.set(Boolean.TRUE); + try { + r.run(); + } finally { + clearNeedsToCheckForOffHeap.remove(); + } + } } - public static void doWithOffHeapClear(Runnable r) { - clearNeedsToCheckForOffHeap.set(Boolean.TRUE); - try { - r.run(); - } finally { - clearNeedsToCheckForOffHeap.remove(); + private static class TokenAddress { + + @Immutable + private static final Token[] addrToObj = + new Token[] {null, Token.INVALID, Token.LOCAL_INVALID, Token.DESTROYED, + Token.REMOVED_PHASE1, + Token.REMOVED_PHASE2, Token.END_OF_STREAM, Token.NOT_AVAILABLE, Token.TOMBSTONE,}; + + private static long objectToAddress(@Unretained Object v) { + if (v instanceof StoredObject) + return ((StoredObject) v).getAddress(); + if (v == null) + return NULL_ADDRESS; + if (v == Token.TOMBSTONE) + return TOMBSTONE_ADDRESS; + if (v == Token.INVALID) + return INVALID_ADDRESS; + if (v == Token.LOCAL_INVALID) + return LOCAL_INVALID_ADDRESS; + if (v == Token.DESTROYED) + return DESTROYED_ADDRESS; + if (v == Token.REMOVED_PHASE1) + return REMOVED_PHASE1_ADDRESS; + if (v == Token.REMOVED_PHASE2) + return REMOVED_PHASE2_ADDRESS; + if (v == Token.END_OF_STREAM) + return END_OF_STREAM_ADDRESS; + if (v == Token.NOT_AVAILABLE) + return NOT_AVAILABLE_ADDRESS; + throw new IllegalStateException("Can not convert " + v + " to an off heap address."); } } + } diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstance.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstance.java new file mode 100644 index 0000000..9218765 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstance.java @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.offheap; + +public class OffHeapRegionEntryHelperInstance { +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java index 51fe988..797779a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java @@ -1020,7 +1020,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V> // Geode changes BEGIN if (clearedEntries == null) { final boolean checkForGatewaySenderEvent = - OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap(); + OffHeapRegionEntryHelper.OffHeapClear.doesClearNeedToCheckForOffHeap(); if (checkForGatewaySenderEvent) { clearedEntries = new ArrayList<HashEntry<?, ?>>(); } else { @@ -1805,7 +1805,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V> if (entries != null) { final ArrayList<HashEntry<?, ?>> clearedEntries = entries; Runnable runnable; - if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) { + if (OffHeapRegionEntryHelper.OffHeapClear.doesClearNeedToCheckForOffHeap()) { runnable = new Runnable() { @Override public void run() { diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperJUnitTest.java index 3c219ff..f22651b 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperJUnitTest.java @@ -910,17 +910,17 @@ public class OffHeapRegionEntryHelperJUnitTest { @Test public void doWithOffHeapClearShouldSetTheThreadLocalToTrue() { // verify that threadlocal is not set - assertThat(OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()).isFalse(); + assertThat(OffHeapRegionEntryHelper.OffHeapClear.doesClearNeedToCheckForOffHeap()).isFalse(); - OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { + OffHeapRegionEntryHelper.OffHeapClear.doWithOffHeapClear(new Runnable() { @Override public void run() { // verify that threadlocal is set when offheap is cleared - assertThat(OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()).isTrue(); + assertThat(OffHeapRegionEntryHelper.OffHeapClear.doesClearNeedToCheckForOffHeap()).isTrue(); } }); // verify that threadlocal is reset after offheap is cleared - assertThat(OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()).isFalse(); + assertThat(OffHeapRegionEntryHelper.OffHeapClear.doesClearNeedToCheckForOffHeap()).isFalse(); } } diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java index 2842e84..280c73e 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -856,7 +856,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { // above. callables.add(Executors.callable(() -> { try { - OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { + OffHeapRegionEntryHelper.OffHeapClear.doWithOffHeapClear(new Runnable() { @Override public void run() { // Wait for the cache writer to be invoked to release this countdown latch.