This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-4685
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-4685 by this 
push:
     new 4c64927  GEODE-4685: Cleaned up the overriding of readSerialized to 
reset to previous value. Added cache to AbstractJdbcCallback.java so that 
children classes can access it. Replaced AtomicLong with LongAdder.
4c64927 is described below

commit 4c64927d769daf744f057e619066f2d77af6c64f
Author: Udo <[email protected]>
AuthorDate: Tue Feb 27 16:08:19 2018 -0800

    GEODE-4685: Cleaned up the overriding of readSerialized to reset to 
previous value.
    Added cache to AbstractJdbcCallback.java so that children classes can 
access it.
    Replaced AtomicLong with LongAdder.
---
 .../geode/connectors/jdbc/JdbcAsyncWriter.java     | 30 ++++++++++------------
 .../apache/geode/connectors/jdbc/JdbcWriter.java   | 10 ++++----
 .../jdbc/internal/AbstractJdbcCallback.java        |  2 ++
 .../geode/cache/query/internal/DefaultQuery.java   |  3 ++-
 .../cache/query/internal/index/IndexManager.java   |  3 ++-
 .../internal/streaming/StreamingOperation.java     |  3 ++-
 .../internal/cache/partitioned/QueryMessage.java   |  4 ++-
 .../cache/query/internal/cq/CqServiceImpl.java     |  3 ++-
 .../lucene/internal/IndexRepositoryFactory.java    |  4 +--
 .../cache/lucene/internal/LuceneEventListener.java |  3 ++-
 10 files changed, 35 insertions(+), 30 deletions(-)

diff --git 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
index 03a3ae6..fb6ce90 100644
--- 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
+++ 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
@@ -16,6 +16,7 @@ package org.apache.geode.connectors.jdbc;
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.logging.log4j.Logger;
 
@@ -38,9 +39,9 @@ import org.apache.geode.pdx.PdxInstance;
 public class JdbcAsyncWriter extends AbstractJdbcCallback implements 
AsyncEventListener {
   private static final Logger logger = LogService.getLogger();
 
-  private AtomicLong totalEvents = new AtomicLong();
-  private AtomicLong successfulEvents = new AtomicLong();
-  private AtomicLong failedEvents = new AtomicLong();
+  private LongAdder totalEvents = new LongAdder();
+  private LongAdder successfulEvents = new LongAdder();
+  private LongAdder failedEvents = new LongAdder();
 
   @SuppressWarnings("unused")
   public JdbcAsyncWriter() {
@@ -60,11 +61,11 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback 
implements AsyncEventL
       checkInitialized((InternalCache) 
events.get(0).getRegion().getRegionService());
     }
 
-    InternalCache internalCache = null;
+    Boolean initialPdxReadSerialized = 
cache.getPdxRegistry().getPdxReadSerializedOverride();
+    cache.getPdxRegistry().setPdxReadSerializedOverride(true);
     try {
       for (AsyncEvent event : events) {
-        internalCache = (InternalCache) event.getRegion().getRegionService();
-        internalCache.getPdxRegistry().setPdxReadSerializedOverride(true);
+
         try {
           getSqlHandler().write(event.getRegion(), event.getOperation(), 
event.getKey(),
               getPdxInstance(event));
@@ -75,36 +76,33 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback 
implements AsyncEventL
         }
       }
     } finally {
-      if (internalCache != null) {
-        internalCache.getPdxRegistry().setPdxReadSerializedOverride(false);
-      }
+      
cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
     }
-
     return true;
   }
 
   long getTotalEvents() {
-    return totalEvents.get();
+    return totalEvents.longValue();
   }
 
   long getSuccessfulEvents() {
-    return successfulEvents.get();
+    return successfulEvents.longValue();
   }
 
   long getFailedEvents() {
-    return failedEvents.get();
+    return failedEvents.longValue();
   }
 
   private void changeSuccessfulEvents(long delta) {
-    successfulEvents.addAndGet(delta);
+    successfulEvents.add(delta);
   }
 
   private void changeFailedEvents(long delta) {
-    failedEvents.addAndGet(delta);
+    failedEvents.add(delta);
   }
 
   private void changeTotalEvents(long delta) {
-    totalEvents.addAndGet(delta);
+    totalEvents.add(delta);
   }
 
   /**
diff --git 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
index 442dff5..e20ea2d 100644
--- 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
+++ 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
@@ -28,7 +28,7 @@ import org.apache.geode.pdx.PdxInstance;
 
 /**
  * This class provides synchronous write through to a data source using JDBC.
- *
+ * 
  * @since Geode 1.4
  */
 @Experimental
@@ -77,8 +77,8 @@ public class JdbcWriter<K, V> extends AbstractJdbcCallback 
implements CacheWrite
   }
 
   private PdxInstance getPdxNewValue(EntryEvent<K, V> event) {
-    InternalCache internalCache = (InternalCache) 
event.getRegion().getRegionService();
-    internalCache.getPdxRegistry().setPdxReadSerializedOverride(true);
+    Boolean initialPdxReadSerialized = 
cache.getPdxRegistry().getPdxReadSerializedOverride();
+    cache.getPdxRegistry().setPdxReadSerializedOverride(true);
     try {
       Object newValue = event.getNewValue();
       if (!(newValue instanceof PdxInstance)) {
@@ -89,14 +89,14 @@ public class JdbcWriter<K, V> extends AbstractJdbcCallback 
implements CacheWrite
           newValue = CopyHelper.copy(newValue);
         }
         if (newValue != null && !(newValue instanceof PdxInstance)) {
-          String valueClassName = newValue == null ? "null" : 
newValue.getClass().getName();
+          String valueClassName = newValue.getClass().getName();
           throw new IllegalArgumentException(getClass().getSimpleName()
               + " only supports PDX values; newValue is " + valueClassName);
         }
       }
       return (PdxInstance) newValue;
     } finally {
-      internalCache.getPdxRegistry().setPdxReadSerializedOverride(false);
+      
cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
     }
   }
 }
diff --git 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
index 8f28d67..98bb42a 100644
--- 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
+++ 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
@@ -24,6 +24,7 @@ import org.apache.geode.internal.cache.InternalCache;
 public abstract class AbstractJdbcCallback implements CacheCallback {
 
   private volatile SqlHandler sqlHandler;
+  protected volatile InternalCache cache;
 
   protected AbstractJdbcCallback() {
     // nothing
@@ -57,6 +58,7 @@ public abstract class AbstractJdbcCallback implements 
CacheCallback {
 
   private synchronized void initialize(InternalCache cache) {
     if (sqlHandler == null) {
+      this.cache = cache;
       JdbcConnectorService service = 
cache.getService(JdbcConnectorService.class);
       DataSourceManager manager = new DataSourceManager(new 
HikariJdbcDataSourceFactory());
       sqlHandler = new SqlHandler(manager, service);
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
index 27501fc..1beb0cb 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
@@ -319,6 +319,7 @@ public class DefaultQuery implements Query {
     QueryExecutor qe = checkQueryOnPR(params);
 
     Object result = null;
+    Boolean initialPdxReadSerialized = 
this.cache.getPdxRegistry().getPdxReadSerializedOverride();
     try {
       // Setting the readSerialized flag for local queries
       this.cache.getPdxRegistry().setPdxReadSerializedOverride(true);
@@ -398,7 +399,7 @@ public class DefaultQuery implements Query {
             "Query was canceled. It may be due to low memory or the query was 
running longer than the MAX_QUERY_EXECUTION_TIME.");
       }
     } finally {
-      this.cache.getPdxRegistry().setPdxReadSerializedOverride(false);
+      
this.cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
       if (queryMonitor != null) {
         queryMonitor.stopMonitoringQueryThread(Thread.currentThread(), this);
       }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
index 6880465..4340b7c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
@@ -1001,6 +1001,7 @@ public class IndexManager {
    */
   private void processAction(RegionEntry entry, int action, int opCode) throws 
QueryException {
     final long startPA = getCachePerfStats().startIndexUpdate();
+    Boolean initialPdxReadSerialized = 
this.cache.getPdxRegistry().getPdxReadSerializedOverride();
     this.cache.getPdxRegistry().setPdxReadSerializedOverride(true);
     TXStateProxy tx = null;
     if (!this.cache.isClient()) {
@@ -1146,7 +1147,7 @@ public class IndexManager {
         }
       }
     } finally {
-      this.cache.getPdxRegistry().setPdxReadSerializedOverride(false);
+      
this.cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
       ((TXManagerImpl) 
this.cache.getCacheTransactionManager()).unpauseTransaction(tx);
 
       getCachePerfStats().endIndexUpdate(startPA);
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
index 1c3ba3e..1ad5ec0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
@@ -505,6 +505,7 @@ public abstract class StreamingOperation {
       Version senderVersion = 
InternalDataSerializer.getVersionForDataStream(in);
       boolean isSenderAbove_8_1 = senderVersion.compareTo(Version.GFE_81) > 0;
       InternalCache cache = (InternalCache) CacheFactory.getAnyInstance();
+      Boolean initialPdxReadSerialized = 
cache.getPdxRegistry().getPdxReadSerializedOverride();
       if (n == -1) {
         this.objectList = null;
       } else {
@@ -559,7 +560,7 @@ public abstract class StreamingOperation {
           }
         } finally {
           if (this.pdxReadSerialized) {
-            cache.getPdxRegistry().setPdxReadSerializedOverride(false);
+            
cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
           }
         }
       }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
index 0410502..8098c41 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
@@ -169,6 +169,8 @@ public class QueryMessage extends 
StreamingPartitionOperation.StreamingPartition
 
     DefaultQuery query = new DefaultQuery(this.queryString, pr.getCache(), 
false);
     // Remote query, use the PDX types in serialized form.
+    Boolean initialPdxReadSerialized =
+        pr.getCache().getPdxRegistry().getPdxReadSerializedOverride();
     pr.getCache().getPdxRegistry().setPdxReadSerializedOverride(true);
     // In case of "select *" queries we can keep the results in serialized 
form and send
     query.setRemoteQuery(true);
@@ -249,7 +251,7 @@ public class QueryMessage extends 
StreamingPartitionOperation.StreamingPartition
       if (isQueryTraced) {
         this.resultCollector.remove(queryTraceList);
       }
-      pr.getCache().getPdxRegistry().setPdxReadSerializedOverride(false);
+      
pr.getCache().getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
       query.setRemoteQuery(false);
       query.endTrace(indexObserver, traceStartTime, this.resultCollector);
     }
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
index d40708b..8f99f87 100644
--- 
a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
@@ -1187,11 +1187,12 @@ public class CqServiceImpl implements CqService {
       processRegionEvent(event, localProfile, profiles, frInfo);
     } else {
       // Use the PDX types in serialized form.
+      Boolean initialPdxReadSerialized = 
this.cache.getPdxRegistry().getPdxReadSerializedOverride();
       this.cache.getPdxRegistry().setPdxReadSerializedOverride(true);
       try {
         processEntryEvent(event, localProfile, profiles, frInfo);
       } finally {
-        this.cache.getPdxRegistry().setPdxReadSerializedOverride(false);
+        
this.cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
       }
     }
   }
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index bfaee79..ee64cc1 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -138,9 +138,7 @@ public class IndexRepositoryFactory {
       throws IOException {
     Set<IndexRepository> affectedRepos = new HashSet<IndexRepository>();
 
-    Iterator keysIterator = dataBucket.keySet().iterator();
-    while (keysIterator.hasNext()) {
-      Object key = keysIterator.next();
+    for (Object key : dataBucket.keySet()) {
       Object value = getValue(userRegion.getEntry(key));
       if (value != null) {
         repo.update(key, value);
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
index 2a227f7..b9209e7 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
@@ -78,6 +78,7 @@ public class LuceneEventListener implements 
AsyncEventListener {
 
   protected boolean process(final List<AsyncEvent> events) {
     // Try to get a PDX instance if possible, rather than a deserialized object
+    Boolean initialPdxReadSerialized = 
this.cache.getPdxRegistry().getPdxReadSerializedOverride();
     cache.getPdxRegistry().setPdxReadSerializedOverride(true);
 
     Set<IndexRepository> affectedRepos = new HashSet<>();
@@ -117,7 +118,7 @@ public class LuceneEventListener implements 
AsyncEventListener {
     } catch (IOException e) {
       throw new InternalGemFireError("Unable to save to lucene index", e);
     } finally {
-      cache.getPdxRegistry().setPdxReadSerializedOverride(false);
+      
cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
     }
   }
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to