This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-4685
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-4685 by this
push:
new 4c64927 GEODE-4685: Cleaned up the overriding of readSerialized to
reset to previous value. Added cache to AbstractJdbcCallback.java so that
children classes can access it. Replaced AtomicLong with LongAdder.
4c64927 is described below
commit 4c64927d769daf744f057e619066f2d77af6c64f
Author: Udo <[email protected]>
AuthorDate: Tue Feb 27 16:08:19 2018 -0800
GEODE-4685: Cleaned up the overriding of readSerialized to reset to
previous value.
Added cache to AbstractJdbcCallback.java so that children classes can
access it.
Replaced AtomicLong with LongAdder.
---
.../geode/connectors/jdbc/JdbcAsyncWriter.java | 30 ++++++++++------------
.../apache/geode/connectors/jdbc/JdbcWriter.java | 10 ++++----
.../jdbc/internal/AbstractJdbcCallback.java | 2 ++
.../geode/cache/query/internal/DefaultQuery.java | 3 ++-
.../cache/query/internal/index/IndexManager.java | 3 ++-
.../internal/streaming/StreamingOperation.java | 3 ++-
.../internal/cache/partitioned/QueryMessage.java | 4 ++-
.../cache/query/internal/cq/CqServiceImpl.java | 3 ++-
.../lucene/internal/IndexRepositoryFactory.java | 4 +--
.../cache/lucene/internal/LuceneEventListener.java | 3 ++-
10 files changed, 35 insertions(+), 30 deletions(-)
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
index 03a3ae6..fb6ce90 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
@@ -16,6 +16,7 @@ package org.apache.geode.connectors.jdbc;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.logging.log4j.Logger;
@@ -38,9 +39,9 @@ import org.apache.geode.pdx.PdxInstance;
public class JdbcAsyncWriter extends AbstractJdbcCallback implements
AsyncEventListener {
private static final Logger logger = LogService.getLogger();
- private AtomicLong totalEvents = new AtomicLong();
- private AtomicLong successfulEvents = new AtomicLong();
- private AtomicLong failedEvents = new AtomicLong();
+ private LongAdder totalEvents = new LongAdder();
+ private LongAdder successfulEvents = new LongAdder();
+ private LongAdder failedEvents = new LongAdder();
@SuppressWarnings("unused")
public JdbcAsyncWriter() {
@@ -60,11 +61,11 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback
implements AsyncEventL
checkInitialized((InternalCache)
events.get(0).getRegion().getRegionService());
}
- InternalCache internalCache = null;
+ Boolean initialPdxReadSerialized =
cache.getPdxRegistry().getPdxReadSerializedOverride();
+ cache.getPdxRegistry().setPdxReadSerializedOverride(true);
try {
for (AsyncEvent event : events) {
- internalCache = (InternalCache) event.getRegion().getRegionService();
- internalCache.getPdxRegistry().setPdxReadSerializedOverride(true);
+
try {
getSqlHandler().write(event.getRegion(), event.getOperation(),
event.getKey(),
getPdxInstance(event));
@@ -75,36 +76,33 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback
implements AsyncEventL
}
}
} finally {
- if (internalCache != null) {
- internalCache.getPdxRegistry().setPdxReadSerializedOverride(false);
- }
+
cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
}
-
return true;
}
long getTotalEvents() {
- return totalEvents.get();
+ return totalEvents.longValue();
}
long getSuccessfulEvents() {
- return successfulEvents.get();
+ return successfulEvents.longValue();
}
long getFailedEvents() {
- return failedEvents.get();
+ return failedEvents.longValue();
}
private void changeSuccessfulEvents(long delta) {
- successfulEvents.addAndGet(delta);
+ successfulEvents.add(delta);
}
private void changeFailedEvents(long delta) {
- failedEvents.addAndGet(delta);
+ failedEvents.add(delta);
}
private void changeTotalEvents(long delta) {
- totalEvents.addAndGet(delta);
+ totalEvents.add(delta);
}
/**
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
index 442dff5..e20ea2d 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
@@ -28,7 +28,7 @@ import org.apache.geode.pdx.PdxInstance;
/**
* This class provides synchronous write through to a data source using JDBC.
- *
+ *
* @since Geode 1.4
*/
@Experimental
@@ -77,8 +77,8 @@ public class JdbcWriter<K, V> extends AbstractJdbcCallback
implements CacheWrite
}
private PdxInstance getPdxNewValue(EntryEvent<K, V> event) {
- InternalCache internalCache = (InternalCache)
event.getRegion().getRegionService();
- internalCache.getPdxRegistry().setPdxReadSerializedOverride(true);
+ Boolean initialPdxReadSerialized =
cache.getPdxRegistry().getPdxReadSerializedOverride();
+ cache.getPdxRegistry().setPdxReadSerializedOverride(true);
try {
Object newValue = event.getNewValue();
if (!(newValue instanceof PdxInstance)) {
@@ -89,14 +89,14 @@ public class JdbcWriter<K, V> extends AbstractJdbcCallback
implements CacheWrite
newValue = CopyHelper.copy(newValue);
}
if (newValue != null && !(newValue instanceof PdxInstance)) {
- String valueClassName = newValue == null ? "null" :
newValue.getClass().getName();
+ String valueClassName = newValue.getClass().getName();
throw new IllegalArgumentException(getClass().getSimpleName()
+ " only supports PDX values; newValue is " + valueClassName);
}
}
return (PdxInstance) newValue;
} finally {
- internalCache.getPdxRegistry().setPdxReadSerializedOverride(false);
+
cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
}
}
}
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
index 8f28d67..98bb42a 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
@@ -24,6 +24,7 @@ import org.apache.geode.internal.cache.InternalCache;
public abstract class AbstractJdbcCallback implements CacheCallback {
private volatile SqlHandler sqlHandler;
+ protected volatile InternalCache cache;
protected AbstractJdbcCallback() {
// nothing
@@ -57,6 +58,7 @@ public abstract class AbstractJdbcCallback implements
CacheCallback {
private synchronized void initialize(InternalCache cache) {
if (sqlHandler == null) {
+ this.cache = cache;
JdbcConnectorService service =
cache.getService(JdbcConnectorService.class);
DataSourceManager manager = new DataSourceManager(new
HikariJdbcDataSourceFactory());
sqlHandler = new SqlHandler(manager, service);
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
index 27501fc..1beb0cb 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
@@ -319,6 +319,7 @@ public class DefaultQuery implements Query {
QueryExecutor qe = checkQueryOnPR(params);
Object result = null;
+ Boolean initialPdxReadSerialized =
this.cache.getPdxRegistry().getPdxReadSerializedOverride();
try {
// Setting the readSerialized flag for local queries
this.cache.getPdxRegistry().setPdxReadSerializedOverride(true);
@@ -398,7 +399,7 @@ public class DefaultQuery implements Query {
"Query was canceled. It may be due to low memory or the query was
running longer than the MAX_QUERY_EXECUTION_TIME.");
}
} finally {
- this.cache.getPdxRegistry().setPdxReadSerializedOverride(false);
+
this.cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
if (queryMonitor != null) {
queryMonitor.stopMonitoringQueryThread(Thread.currentThread(), this);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
index 6880465..4340b7c 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
@@ -1001,6 +1001,7 @@ public class IndexManager {
*/
private void processAction(RegionEntry entry, int action, int opCode) throws
QueryException {
final long startPA = getCachePerfStats().startIndexUpdate();
+ Boolean initialPdxReadSerialized =
this.cache.getPdxRegistry().getPdxReadSerializedOverride();
this.cache.getPdxRegistry().setPdxReadSerializedOverride(true);
TXStateProxy tx = null;
if (!this.cache.isClient()) {
@@ -1146,7 +1147,7 @@ public class IndexManager {
}
}
} finally {
- this.cache.getPdxRegistry().setPdxReadSerializedOverride(false);
+
this.cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
((TXManagerImpl)
this.cache.getCacheTransactionManager()).unpauseTransaction(tx);
getCachePerfStats().endIndexUpdate(startPA);
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
index 1c3ba3e..1ad5ec0 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
@@ -505,6 +505,7 @@ public abstract class StreamingOperation {
Version senderVersion =
InternalDataSerializer.getVersionForDataStream(in);
boolean isSenderAbove_8_1 = senderVersion.compareTo(Version.GFE_81) > 0;
InternalCache cache = (InternalCache) CacheFactory.getAnyInstance();
+ Boolean initialPdxReadSerialized =
cache.getPdxRegistry().getPdxReadSerializedOverride();
if (n == -1) {
this.objectList = null;
} else {
@@ -559,7 +560,7 @@ public abstract class StreamingOperation {
}
} finally {
if (this.pdxReadSerialized) {
- cache.getPdxRegistry().setPdxReadSerializedOverride(false);
+
cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
}
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
index 0410502..8098c41 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
@@ -169,6 +169,8 @@ public class QueryMessage extends
StreamingPartitionOperation.StreamingPartition
DefaultQuery query = new DefaultQuery(this.queryString, pr.getCache(),
false);
// Remote query, use the PDX types in serialized form.
+ Boolean initialPdxReadSerialized =
+ pr.getCache().getPdxRegistry().getPdxReadSerializedOverride();
pr.getCache().getPdxRegistry().setPdxReadSerializedOverride(true);
// In case of "select *" queries we can keep the results in serialized
form and send
query.setRemoteQuery(true);
@@ -249,7 +251,7 @@ public class QueryMessage extends
StreamingPartitionOperation.StreamingPartition
if (isQueryTraced) {
this.resultCollector.remove(queryTraceList);
}
- pr.getCache().getPdxRegistry().setPdxReadSerializedOverride(false);
+
pr.getCache().getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
query.setRemoteQuery(false);
query.endTrace(indexObserver, traceStartTime, this.resultCollector);
}
diff --git
a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
index d40708b..8f99f87 100644
---
a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
+++
b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
@@ -1187,11 +1187,12 @@ public class CqServiceImpl implements CqService {
processRegionEvent(event, localProfile, profiles, frInfo);
} else {
// Use the PDX types in serialized form.
+ Boolean initialPdxReadSerialized =
this.cache.getPdxRegistry().getPdxReadSerializedOverride();
this.cache.getPdxRegistry().setPdxReadSerializedOverride(true);
try {
processEntryEvent(event, localProfile, profiles, frInfo);
} finally {
- this.cache.getPdxRegistry().setPdxReadSerializedOverride(false);
+
this.cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
}
}
}
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index bfaee79..ee64cc1 100644
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -138,9 +138,7 @@ public class IndexRepositoryFactory {
throws IOException {
Set<IndexRepository> affectedRepos = new HashSet<IndexRepository>();
- Iterator keysIterator = dataBucket.keySet().iterator();
- while (keysIterator.hasNext()) {
- Object key = keysIterator.next();
+ for (Object key : dataBucket.keySet()) {
Object value = getValue(userRegion.getEntry(key));
if (value != null) {
repo.update(key, value);
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
index 2a227f7..b9209e7 100644
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
@@ -78,6 +78,7 @@ public class LuceneEventListener implements
AsyncEventListener {
protected boolean process(final List<AsyncEvent> events) {
// Try to get a PDX instance if possible, rather than a deserialized object
+ Boolean initialPdxReadSerialized =
this.cache.getPdxRegistry().getPdxReadSerializedOverride();
cache.getPdxRegistry().setPdxReadSerializedOverride(true);
Set<IndexRepository> affectedRepos = new HashSet<>();
@@ -117,7 +118,7 @@ public class LuceneEventListener implements
AsyncEventListener {
} catch (IOException e) {
throw new InternalGemFireError("Unable to save to lucene index", e);
} finally {
- cache.getPdxRegistry().setPdxReadSerializedOverride(false);
+
cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].