This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new d8263c4  ExchangeService API
d8263c4 is described below

commit d8263c423c1c07fed16fc64d932a1067ffefc4e9
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Dec 13 15:40:38 2019 +0300

    ExchangeService API
---
 .../query/calcite/exchange/ExchangeProcessor.java  |  7 ++-
 .../processors/query/calcite/exchange/Inbox.java   | 51 +++++++++++++++++++++-
 .../processors/query/calcite/exchange/Outbox.java  | 11 ++---
 .../query/calcite/exchange/OutboxTest.java         |  8 ++--
 4 files changed, 62 insertions(+), 15 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
index 6a6810a..803417c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
@@ -24,10 +24,13 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
  *
  */
 public interface ExchangeProcessor {
+    int BATCH_SIZE = 200;
+    int PER_NODE_BATCH_COUNT = 10;
+
     <T> Outbox<T> register(Outbox<T> outbox);
     <T> void unregister(Outbox<T> outbox);
-    Inbox register(Inbox inbox);
-    void unregister(Inbox inbox);
+    <T> Inbox<T> register(Inbox<T> inbox);
+    <T> void unregister(Inbox<T> inbox);
     void send(GridCacheVersion queryId, long exchangeId, UUID nodeId, int 
batchId, List<?> rows);
     void acknowledge(GridCacheVersion queryId, long exchangeId, UUID nodeId, 
int batchId);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
index 83a8099..bd515a5 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
@@ -16,8 +16,55 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exchange;
 
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.calcite.exec.SingleNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.Sink;
+import org.apache.ignite.internal.processors.query.calcite.exec.Source;
+
 /**
- *
+ * TODO
  */
-public class Inbox {
+public class Inbox<T> implements SingleNode<T> {
+    private final GridCacheVersion queryId;
+    private final long exchangeId;
+
+    private Sink<T> target;
+    private Collection<UUID> sources;
+    private Comparator<T> comparator;
+    private ExchangeProcessor srvc;
+
+    public Inbox(GridCacheVersion queryId, long exchangeId) {
+        this.queryId = queryId;
+        this.exchangeId = exchangeId;
+    }
+
+    public void bind(Sink<T> target, Collection<UUID> sources, Comparator<T> 
comparator) {
+        this.target = target;
+        this.sources = sources;
+        this.comparator = comparator;
+    }
+
+    void init(ExchangeProcessor srvc) {
+        this.srvc = srvc;
+    }
+
+    @Override public void signal() {
+        // No-op.
+    }
+
+    @Override public void sources(List<Source> sources) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override public Sink<T> sink(int idx) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void push(UUID source, int batchId, List<?> rows) {
+
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
index 9099b46..f118330 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
@@ -33,9 +33,6 @@ import org.apache.ignite.internal.util.typedef.F;
  *
  */
 public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, 
Sink<T> {
-    static final int BATCH_SIZE = 200;
-    static final int PER_NODE_BATCH_COUNT = 10;
-
     private final Map<UUID, Destination> perNode = new HashMap<>();
 
     private final GridCacheVersion queryId;
@@ -112,7 +109,7 @@ public class Outbox<T> extends AbstractNode<T> implements 
SingleNode<T>, Sink<T>
         private int hwm = -1;
         private int lwm = -1;
 
-        private ArrayList<Object> curr = new ArrayList<>(BATCH_SIZE + 1); // 
extra space for end marker;
+        private ArrayList<Object> curr = new 
ArrayList<>(ExchangeProcessor.BATCH_SIZE + 1); // extra space for end marker;
 
         private boolean needSignal;
 
@@ -121,12 +118,12 @@ public class Outbox<T> extends AbstractNode<T> implements 
SingleNode<T>, Sink<T>
         }
 
         public void add(T row) {
-            if (curr.size() == BATCH_SIZE) {
+            if (curr.size() == ExchangeProcessor.BATCH_SIZE) {
                 assert ready() && srvc != null;
 
                 srvc.send(queryId, exchangeId, nodeId, ++hwm, curr);
 
-                curr = new ArrayList<>(BATCH_SIZE);
+                curr = new ArrayList<>(ExchangeProcessor.BATCH_SIZE);
             }
 
             curr.add(row);
@@ -144,7 +141,7 @@ public class Outbox<T> extends AbstractNode<T> implements 
SingleNode<T>, Sink<T>
         }
 
         boolean ready() {
-            return hwm - lwm < PER_NODE_BATCH_COUNT || curr.size() < 
BATCH_SIZE;
+            return hwm - lwm < ExchangeProcessor.PER_NODE_BATCH_COUNT || 
curr.size() < ExchangeProcessor.BATCH_SIZE;
         }
 
         void acknowledge(int id) {
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
index 61c8a7d..eaac7d1 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
@@ -61,7 +61,7 @@ public class OutboxTest extends GridCommonAbstractTest {
 
         source.signal = false;
 
-        int maxRows = Outbox.BATCH_SIZE * (Outbox.PER_NODE_BATCH_COUNT + 1);
+        int maxRows = ExchangeProcessor.BATCH_SIZE * 
(ExchangeProcessor.PER_NODE_BATCH_COUNT + 1);
         int rows = 0;
 
         while (sink.push(new Object[]{new Object()})) {
@@ -74,7 +74,7 @@ public class OutboxTest extends GridCommonAbstractTest {
 
         assertFalse(exch.ids.isEmpty());
 
-        assertEquals(Outbox.PER_NODE_BATCH_COUNT, exch.ids.size());
+        assertEquals(ExchangeProcessor.PER_NODE_BATCH_COUNT, exch.ids.size());
 
         assertFalse(sink.push(new Object[]{new Object()}));
 
@@ -123,11 +123,11 @@ public class OutboxTest extends GridCommonAbstractTest {
             lastBatch = rows;
         }
 
-        @Override public Inbox register(Inbox inbox) {
+        @Override public <T> Inbox<T> register(Inbox<T> inbox) {
             throw new AssertionError();
         }
 
-        @Override public void unregister(Inbox inbox) {
+        @Override public <T> void unregister(Inbox<T> inbox) {
             throw new AssertionError();
         }
 

Reply via email to