This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f35f6da3198 IGNITE-24966 Sql. Improve OrderedMergePublisher
performance (#5525)
f35f6da3198 is described below
commit f35f6da31982029af2e4f38be00e58b58572c47b
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Thu Apr 3 19:44:46 2025 +0300
IGNITE-24966 Sql. Improve OrderedMergePublisher performance (#5525)
---
.../util/subscription/OrderedMergePublisher.java | 199 +++++++++++++++------
1 file changed, 140 insertions(+), 59 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/OrderedMergePublisher.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/OrderedMergePublisher.java
index ebb5aee94cd..20f45c3241b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/OrderedMergePublisher.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/OrderedMergePublisher.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.util.subscription;
+import static it.unimi.dsi.fastutil.objects.ObjectArrays.swap;
+
+import it.unimi.dsi.fastutil.IndirectPriorityQueue;
+import it.unimi.dsi.fastutil.objects.ObjectHeapIndirectPriorityQueue;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.invoke.VarHandle;
@@ -86,17 +90,22 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
final Subscriber<? super T> downstream;
/** Counter to prevent concurrent execution of a critical section. */
- private final AtomicInteger guardCntr = new AtomicInteger();
+ // Set initial value to 1 to prevent data processing until all
subscribers is initialized.
+ private final AtomicInteger guardCntr = new AtomicInteger(1);
/** Subscribers. */
private final OrderedMergeSubscriber<T>[] subscribers;
- /** Rows comparator. */
- private final Comparator<? super T> comp;
-
/** Last received values. */
private final Object[] values;
+ /** Values view sorted in comparator order. */
+ private final IndirectPriorityQueue<Object> valuesQueue;
+
+ /** Processing state. */
+ @SuppressWarnings({"unused", "FieldMayBeFinal"})
+ private State state = State.INITIAL;
+
/** Error. */
@SuppressWarnings({"unused", "FieldMayBeFinal"})
private ErrorChain errorChain;
@@ -109,6 +118,9 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
@SuppressWarnings({"unused", "FieldMayBeFinal"})
private long requested;
+ // Number of non-initialized values.
+ private int waiting;
+
/** Number of emitted rows (guarded by {@link #guardCntr}). */
private long emitted;
@@ -116,6 +128,8 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
static final VarHandle CANCELLED;
+ static final VarHandle STATE;
+
static final VarHandle REQUESTED;
static {
@@ -125,6 +139,7 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
ERROR_CHAIN = lk.findVarHandle(OrderedMergeSubscription.class,
"errorChain", ErrorChain.class);
CANCELLED = lk.findVarHandle(OrderedMergeSubscription.class,
"cancelled", boolean.class);
REQUESTED = lk.findVarHandle(OrderedMergeSubscription.class,
"requested", long.class);
+ STATE = lk.findVarHandle(OrderedMergeSubscription.class,
"state", State.class);
} catch (Throwable ex) {
throw new InternalError(ex);
}
@@ -140,7 +155,6 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
*/
OrderedMergeSubscription(Subscriber<? super T> downstream,
Comparator<? super T> comp, int prefetch, int cnt) {
this.downstream = downstream;
- this.comp = comp;
this.subscribers = new OrderedMergeSubscriber[cnt];
for (int i = 0; i < cnt; i++) {
@@ -148,12 +162,16 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
}
this.values = new Object[cnt];
+ this.valuesQueue = new ObjectHeapIndirectPriorityQueue(values,
comp);
+ this.waiting = cnt;
}
void subscribe(Publisher<? extends T>[] sources) {
for (int i = 0; i < sources.length; i++) {
sources[i].subscribe(subscribers[i]);
}
+
+ guardCntr.set(0);
}
/** {@inheritDoc} */
@@ -179,11 +197,13 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
@Override
public void cancel() {
if (CANCELLED.compareAndSet(this, false, true)) {
+ STATE.setRelease(this, State.STOP);
+
for (OrderedMergeSubscriber<T> inner : subscribers) {
inner.cancel();
}
- if (guardCntr.getAndIncrement() == 0) {
+ if (guardCntr.get() == 0) {
Arrays.fill(values, null);
for (OrderedMergeSubscriber<T> inner : subscribers) {
@@ -221,86 +241,124 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
// Frequently accessed fields.
Subscriber<? super T> downstream = this.downstream;
OrderedMergeSubscriber<T>[] subscribers = this.subscribers;
- int subsCnt = subscribers.length;
Object[] values = this.values;
+
long emitted = this.emitted;
+ // Retry loop.
for (; ; ) {
- long requested = (long) REQUESTED.getAcquire(this);
+ switch ((State) STATE.getAcquire(this)) {
+ case INITIAL: {
+ int waiting = this.waiting;
- for (; ; ) {
- if ((boolean) CANCELLED.getAcquire(this)) {
- Arrays.fill(values, null);
+ // Moves non-initialized sources to the beginning of
the array for faster array scans
+ // in the case of long initialization.
+ for (int i = 0; i < waiting; ) {
+ boolean innerDone = subscribers[0].done; // Read
before polling to preserve correct program order.
+ Object obj = subscribers[0].queue.poll();
- for (OrderedMergeSubscriber<T> inner : subscribers) {
- inner.queue.clear();
- }
+ int done = (obj == null && innerDone) ? 1 : 0; //
Flag has no effect if poll was successful.
+ int initialized = obj != null || innerDone ? 1 : 0;
- return;
- }
+ values[0] = done > 0 ? DONE : obj;
- int completed = 0;
- boolean waitResponse = false;
+ waiting -= initialized;
- for (int i = 0; i < subsCnt; i++) {
- Object obj = values[i];
+ int move = initialized * waiting; // No effect if
value wasn't initialized.
+ swap(values, 0, move);
+ swap(subscribers, 0, move);
- if (obj == DONE) {
- completed++;
- } else if (obj == null) {
- boolean innerDone = subscribers[i].done;
+ i = (initialized == 0) ? waiting : i; // Exit if
any value was not found.
+ }
- obj = subscribers[i].queue.poll();
+ this.waiting = waiting;
- if (obj != null) {
- values[i] = obj;
- } else if (innerDone) {
- values[i] = DONE;
+ if (waiting == 0) {
+ // Got first rows from all subscribers.
+ // Add all non-completed sources to the priority
queue.
+ for (int i = 0; i < values.length; i++) {
+ if (values[i] != DONE) {
+ valuesQueue.enqueue(i);
+ }
+ }
- completed++;
- } else {
- // Subscriber has not received a response yet.
- waitResponse = true;
+ // Then either start merge process or proceed with
finishing if there is nothing to do.
+ State state = valuesQueue.isEmpty() ?
State.COMPLETING : State.RUNNING;
+ STATE.compareAndSet(this, State.INITIAL, state);
- break;
- }
+ continue;
}
+
+ break;
}
+ case RUNNING: {
+ long requested = (long) REQUESTED.getAcquire(this);
+
+ // Emit loop.
+ while (!valuesQueue.isEmpty()) {
+ int minIndex = valuesQueue.first();
+
+ if (values[minIndex] == null) {
+ boolean done = subscribers[minIndex].done;
+ T val = subscribers[minIndex].queue.poll();
+
+ if (val != null) {
+ values[minIndex] = val;
+ valuesQueue.changed(); // Force queue move
the new value to it's place.
+ minIndex = valuesQueue.first();
+ } else if (done) {
+ // No more values to emit for the current
source, remove it from queue.
+ valuesQueue.dequeue();
+ continue;
+ } else {
+ // Nothing to do, value wasn't received
yet.
+ break;
+ }
+ }
- if (completed == subsCnt) {
- ErrorChain chain = (ErrorChain)
ERROR_CHAIN.getAcquire(this);
+ if (emitted == requested) {
+ break;
+ }
+
+ downstream.onNext((T) values[minIndex]);
+ emitted++;
- if (chain == null) {
- downstream.onComplete();
- } else {
- downstream.onError(chain.buildThrowable());
+ values[minIndex] = null;
+ subscribers[minIndex].request(1);
}
- return;
- }
+ if (valuesQueue.isEmpty()) {
+ STATE.compareAndSet(this, State.RUNNING,
State.COMPLETING);
+
+ continue;
+ }
- if (waitResponse || emitted == requested) {
break;
}
+ case COMPLETING: {
+ STATE.set(this, State.STOP);
- T min = null;
- int minIndex = -1;
+ // If subscription was not cancelled, there is no need
to notify downstream.
+ if (!(boolean) CANCELLED.getAcquire(this)) {
+ assert valuesQueue.isEmpty();
- for (int i = 0; i < values.length; i++) {
- Object obj = values[i];
+ finish(downstream);
+ }
- if (obj != DONE && (min == null || comp.compare(min,
(T) obj) > 0)) {
- min = (T) obj;
- minIndex = i;
+ // Cleanup.
+ Arrays.fill(values, null);
+ for (OrderedMergeSubscriber<T> inner : subscribers) {
+ inner.queue.clear();
}
+ // No need to release guard.
+ return;
}
-
- values[minIndex] = null;
-
- downstream.onNext(min);
-
- emitted++;
- subscribers[minIndex].request(1);
+ case STOP: {
+ // Terminal state. No need to release guard.
+ return;
+ }
+ default:
+ throw new IllegalStateException("Should never get
here.");
}
this.emitted = emitted;
@@ -309,6 +367,17 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
if (guardCntr.decrementAndGet() == 0) {
break;
}
+ guardCntr.set(1);
+ }
+ }
+
+ private void finish(Subscriber<? super T> downstream) {
+ ErrorChain chain = (ErrorChain) ERROR_CHAIN.getAcquire(this);
+
+ if (chain == null) {
+ downstream.onComplete();
+ } else {
+ downstream.onError(chain.buildThrowable());
}
}
@@ -377,7 +446,7 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
/** {@inheritDoc} */
@Override
- public void request(long n) {
+ public synchronized void request(long n) {
int c = consumed + 1;
if (c == limit) {
@@ -435,4 +504,16 @@ public class OrderedMergePublisher<T> implements
Publisher<T> {
return error;
}
}
+
+ /** Merge process states. */
+ private enum State {
+ /** Wait for a first rows received from each of the source
subscribers. */
+ INITIAL,
+ /** Process incoming data. */
+ RUNNING,
+ /** Finish data processing and notify downstream. */
+ COMPLETING,
+ /** Terminal state. Just do nothing. */
+ STOP
+ }
}