Repository: geode
Updated Branches:
  refs/heads/feature/GEM-1299 5f991020c -> e289f4037


fix-2


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/e289f403
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/e289f403
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/e289f403

Branch: refs/heads/feature/GEM-1299
Commit: e289f4037585755f72ffa053493f436e926989f2
Parents: 5f99102
Author: zhouxh <gz...@pivotal.io>
Authored: Thu Apr 20 15:19:06 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Thu Apr 20 15:19:06 2017 -0700

----------------------------------------------------------------------
 .../PokeLuceneAsyncQueueFunction.java           | 95 ++++++++++++++++++++
 1 file changed, 95 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/e289f403/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
new file mode 100644
index 0000000..992972b
--- /dev/null
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.lucene.internal.distributed;
+
+import java.util.List;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.internal.InternalEntity;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity {
+  private static final long serialVersionUID = 1L;
+  public static final String ID = PokeLuceneAsyncQueueFunction.class.getName();
+
+  private static final Logger logger = LogService.getLogger();
+
+  @Override
+  public void execute(FunctionContext context) {
+    Object[] args = (Object[]) context.getArguments();
+    queueLocally(context, (String) args[0], args[1], (GatewaySenderEventImpl) 
args[2]);
+  }
+
+  protected void queueLocally(FunctionContext context, String regionName, 
Object key,
+      GatewaySenderEventImpl event) {
+    // Get the AsyncEventQueue
+    RegionFunctionContext ctx = (RegionFunctionContext) context;
+
+    PartitionedRegion pr = (PartitionedRegion) ctx.getDataSet();
+    Cache cache = pr.getCache();
+    String queueId = (String) 
pr.getAttributes().getAsyncEventQueueIds().iterator().next();
+    AsyncEventQueueImpl queue = (AsyncEventQueueImpl) 
cache.getAsyncEventQueue(queueId);
+
+    // Get the GatewaySender
+    AbstractGatewaySender sender = (AbstractGatewaySender) queue.getSender();
+
+    // Update the shadow key
+    BucketRegion br = pr.getBucketRegion(key);
+    if (br.getBucketAdvisor().isPrimary()) {
+      try {
+        List<ParallelGatewaySenderEventProcessor> processors =
+            ((ConcurrentParallelGatewaySenderEventProcessor) 
sender.getEventProcessor())
+                .getProcessors();
+        ParallelGatewaySenderEventProcessor processor =
+            processors.get(event.getBucketId() % 
sender.getDispatcherThreads());
+        processor.getQueue().put(event);
+      } catch (InterruptedException e) {
+
+      }
+    }
+  }
+
+  @Override
+  public boolean isHA() {
+    return false;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return false;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+}

Reply via email to