This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new d8263c4 ExchangeService API
d8263c4 is described below
commit d8263c423c1c07fed16fc64d932a1067ffefc4e9
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Dec 13 15:40:38 2019 +0300
ExchangeService API
---
.../query/calcite/exchange/ExchangeProcessor.java | 7 ++-
.../processors/query/calcite/exchange/Inbox.java | 51 +++++++++++++++++++++-
.../processors/query/calcite/exchange/Outbox.java | 11 ++---
.../query/calcite/exchange/OutboxTest.java | 8 ++--
4 files changed, 62 insertions(+), 15 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
index 6a6810a..803417c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
@@ -24,10 +24,13 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
*
*/
public interface ExchangeProcessor {
+ int BATCH_SIZE = 200;
+ int PER_NODE_BATCH_COUNT = 10;
+
<T> Outbox<T> register(Outbox<T> outbox);
<T> void unregister(Outbox<T> outbox);
- Inbox register(Inbox inbox);
- void unregister(Inbox inbox);
+ <T> Inbox<T> register(Inbox<T> inbox);
+ <T> void unregister(Inbox<T> inbox);
void send(GridCacheVersion queryId, long exchangeId, UUID nodeId, int
batchId, List<?> rows);
void acknowledge(GridCacheVersion queryId, long exchangeId, UUID nodeId,
int batchId);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
index 83a8099..bd515a5 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Inbox.java
@@ -16,8 +16,55 @@
package org.apache.ignite.internal.processors.query.calcite.exchange;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.calcite.exec.SingleNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.Sink;
+import org.apache.ignite.internal.processors.query.calcite.exec.Source;
+
/**
- *
+ * TODO
*/
-public class Inbox {
+public class Inbox<T> implements SingleNode<T> {
+ private final GridCacheVersion queryId;
+ private final long exchangeId;
+
+ private Sink<T> target;
+ private Collection<UUID> sources;
+ private Comparator<T> comparator;
+ private ExchangeProcessor srvc;
+
+ public Inbox(GridCacheVersion queryId, long exchangeId) {
+ this.queryId = queryId;
+ this.exchangeId = exchangeId;
+ }
+
+ public void bind(Sink<T> target, Collection<UUID> sources, Comparator<T>
comparator) {
+ this.target = target;
+ this.sources = sources;
+ this.comparator = comparator;
+ }
+
+ void init(ExchangeProcessor srvc) {
+ this.srvc = srvc;
+ }
+
+ @Override public void signal() {
+ // No-op.
+ }
+
+ @Override public void sources(List<Source> sources) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public Sink<T> sink(int idx) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void push(UUID source, int batchId, List<?> rows) {
+
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
index 9099b46..f118330 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
@@ -33,9 +33,6 @@ import org.apache.ignite.internal.util.typedef.F;
*
*/
public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>,
Sink<T> {
- static final int BATCH_SIZE = 200;
- static final int PER_NODE_BATCH_COUNT = 10;
-
private final Map<UUID, Destination> perNode = new HashMap<>();
private final GridCacheVersion queryId;
@@ -112,7 +109,7 @@ public class Outbox<T> extends AbstractNode<T> implements
SingleNode<T>, Sink<T>
private int hwm = -1;
private int lwm = -1;
- private ArrayList<Object> curr = new ArrayList<>(BATCH_SIZE + 1); //
extra space for end marker;
+ private ArrayList<Object> curr = new
ArrayList<>(ExchangeProcessor.BATCH_SIZE + 1); // extra space for end marker;
private boolean needSignal;
@@ -121,12 +118,12 @@ public class Outbox<T> extends AbstractNode<T> implements
SingleNode<T>, Sink<T>
}
public void add(T row) {
- if (curr.size() == BATCH_SIZE) {
+ if (curr.size() == ExchangeProcessor.BATCH_SIZE) {
assert ready() && srvc != null;
srvc.send(queryId, exchangeId, nodeId, ++hwm, curr);
- curr = new ArrayList<>(BATCH_SIZE);
+ curr = new ArrayList<>(ExchangeProcessor.BATCH_SIZE);
}
curr.add(row);
@@ -144,7 +141,7 @@ public class Outbox<T> extends AbstractNode<T> implements
SingleNode<T>, Sink<T>
}
boolean ready() {
- return hwm - lwm < PER_NODE_BATCH_COUNT || curr.size() <
BATCH_SIZE;
+ return hwm - lwm < ExchangeProcessor.PER_NODE_BATCH_COUNT ||
curr.size() < ExchangeProcessor.BATCH_SIZE;
}
void acknowledge(int id) {
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
index 61c8a7d..eaac7d1 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
@@ -61,7 +61,7 @@ public class OutboxTest extends GridCommonAbstractTest {
source.signal = false;
- int maxRows = Outbox.BATCH_SIZE * (Outbox.PER_NODE_BATCH_COUNT + 1);
+ int maxRows = ExchangeProcessor.BATCH_SIZE *
(ExchangeProcessor.PER_NODE_BATCH_COUNT + 1);
int rows = 0;
while (sink.push(new Object[]{new Object()})) {
@@ -74,7 +74,7 @@ public class OutboxTest extends GridCommonAbstractTest {
assertFalse(exch.ids.isEmpty());
- assertEquals(Outbox.PER_NODE_BATCH_COUNT, exch.ids.size());
+ assertEquals(ExchangeProcessor.PER_NODE_BATCH_COUNT, exch.ids.size());
assertFalse(sink.push(new Object[]{new Object()}));
@@ -123,11 +123,11 @@ public class OutboxTest extends GridCommonAbstractTest {
lastBatch = rows;
}
- @Override public Inbox register(Inbox inbox) {
+ @Override public <T> Inbox<T> register(Inbox<T> inbox) {
throw new AssertionError();
}
- @Override public void unregister(Inbox inbox) {
+ @Override public <T> void unregister(Inbox<T> inbox) {
throw new AssertionError();
}