This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 67b3167075 IGNITE-23621 Data Streamer: report failed entries on error
(#4691)
67b3167075 is described below
commit 67b3167075af3f08a1a6a3153643bb0e1bf5bec4
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Nov 8 17:27:48 2024 +0200
IGNITE-23621 Data Streamer: report failed entries on error (#4691)
* Save all items that were not successfully sent to the cluster and return
them in DataStreamerException#failedItems
* Refuse items when closed (normally or due to an exception)
---
.../apache/ignite/table/DataStreamerException.java | 84 +++++++++++++++++++
...ctClientStreamerPartitionAwarenessProvider.java | 13 +--
.../org/apache/ignite/client/DataStreamerTest.java | 14 +++-
.../ignite/internal/streamer/StreamerBuffer.java | 28 +++----
.../internal/streamer/StreamerSubscriber.java | 87 ++++++++++++++------
.../ignite/internal/streamer/DirectPublisher.java | 64 +++++++++++++++
.../streamer/ItAbstractDataStreamerTest.java | 93 +++++++++++++++++++---
.../sql/engine/ItNotNullConstraintTest.java | 5 +-
8 files changed, 322 insertions(+), 66 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerException.java
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerException.java
new file mode 100644
index 0000000000..93fd3ec12c
--- /dev/null
+++
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerException.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table;
+
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents an exception that is thrown during data streaming. Includes
information about failed items.
+ */
+public final class DataStreamerException extends IgniteException {
+ private static final long serialVersionUID = -7516813059448101081L;
+
+ private final Set<?> failedItems;
+
+ /**
+ * Constructor.
+ *
+ * @param failedItems Set of failed items.
+ * @param cause Cause of the exception.
+ */
+ public DataStreamerException(Set<?> failedItems, Throwable cause) {
+ super(getCode(cause), cause.getMessage(), cause);
+
+ this.failedItems = failedItems;
+ }
+
+ /**
+ * Creates an exception with the given trace ID, error code, detailed
message, and cause.
+ *
+ * @param traceId Unique identifier of the exception.
+ * @param code Full error code.
+ * @param message Detailed message.
+ * @param cause Optional nested exception (can be {@code null}).
+ */
+ public DataStreamerException(UUID traceId, int code, String message,
@Nullable Throwable cause) {
+ super(traceId, code, message, cause);
+ this.failedItems = Set.of();
+ }
+
+ /**
+ * Gets the set of items that were not streamed to the cluster.
+ *
+ * @return Set of failed items.
+ */
+ public Set<?> failedItems() {
+ return failedItems;
+ }
+
+ private static int getCode(Throwable cause) {
+ if (cause instanceof IgniteException) {
+ IgniteException e = (IgniteException) cause;
+
+ return e.code();
+ }
+
+ if (cause.getCause() instanceof IgniteException) {
+ IgniteException e = (IgniteException) cause.getCause();
+
+ return e.code();
+ }
+
+ return INTERNAL_ERR;
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientStreamerPartitionAwarenessProvider.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientStreamerPartitionAwarenessProvider.java
index 0cd37f657e..b79616c0ef 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientStreamerPartitionAwarenessProvider.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientStreamerPartitionAwarenessProvider.java
@@ -27,8 +27,8 @@ import
org.apache.ignite.internal.streamer.StreamerPartitionAwarenessProvider;
*/
abstract class AbstractClientStreamerPartitionAwarenessProvider<T> implements
StreamerPartitionAwarenessProvider<T, Integer> {
private final ClientTable tbl;
- private int partitions = -1;
- private ClientSchema schema;
+ private volatile int partitions = -1;
+ private volatile ClientSchema schema;
AbstractClientStreamerPartitionAwarenessProvider(ClientTable tbl) {
this.tbl = tbl;
@@ -36,12 +36,15 @@ abstract class
AbstractClientStreamerPartitionAwarenessProvider<T> implements St
@Override
public Integer partition(T item) {
- if (schema == null || partitions < 0) {
+ ClientSchema schema0 = schema;
+ int partitions0 = partitions;
+
+ if (schema0 == null || partitions0 < 0) {
throw new
IllegalStateException("StreamerPartitionAwarenessProvider.refresh() was not
called or awaited.");
}
- int hash = colocationHash(schema, item);
- return Math.abs(hash % partitions);
+ int hash = colocationHash(schema0, item);
+ return Math.abs(hash % partitions0);
}
abstract int colocationHash(ClientSchema schema, T item);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
b/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
index a4813fb9f0..bca9099035 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
@@ -18,7 +18,7 @@
package org.apache.ignite.client;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -61,6 +61,7 @@ import org.apache.ignite.client.IgniteClient.Builder;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
import org.apache.ignite.internal.streamer.SimplePublisher;
+import org.apache.ignite.table.DataStreamerException;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.DataStreamerReceiver;
@@ -627,10 +628,15 @@ public class DataStreamerTest extends
AbstractClientTableTest {
}
}
- assertThat(streamerFut, willThrow(ArithmeticException.class, "Result
subscriber exception"));
+ assertThat(streamerFut,
willThrowWithCauseOrSuppressed(ArithmeticException.class, "Result subscriber
exception"));
assertFalse(resultSubscriber.completed.get());
- assertInstanceOf(CompletionException.class,
resultSubscriber.error.get());
- assertInstanceOf(ArithmeticException.class,
resultSubscriber.error.get().getCause());
+
+ Throwable subscriberErr = resultSubscriber.error.get();
+ assertInstanceOf(DataStreamerException.class, subscriberErr);
+ assertInstanceOf(CompletionException.class, subscriberErr.getCause());
+ assertInstanceOf(ArithmeticException.class,
subscriberErr.getCause().getCause());
+
+ assertEquals(3, ((DataStreamerException)
subscriberErr).failedItems().size());
}
@Test
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
index b79c8e7f75..20d9cb1cd7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
@@ -18,27 +18,23 @@
package org.apache.ignite.internal.streamer;
import java.util.ArrayList;
-import java.util.BitSet;
import java.util.List;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
class StreamerBuffer<T> {
private final int capacity;
- private final BiConsumer<List<T>, BitSet> flusher;
+ private final Consumer<List<T>> flusher;
/** Primary buffer. Won't grow over capacity. */
private List<T> buf;
- private BitSet deleted;
-
private boolean closed;
- StreamerBuffer(int capacity, BiConsumer<List<T>, BitSet> flusher) {
+ StreamerBuffer(int capacity, Consumer<List<T>> flusher) {
this.capacity = capacity;
this.flusher = flusher;
buf = new ArrayList<>(capacity);
- deleted = new BitSet(capacity);
}
/**
@@ -46,21 +42,16 @@ class StreamerBuffer<T> {
*
* @param item Item.
*/
- synchronized void add(T item, boolean delete) {
+ synchronized void add(T item) {
if (closed) {
throw new IllegalStateException("Streamer is closed, can't add
items.");
}
buf.add(item);
- if (delete) {
- deleted.set(buf.size() - 1);
- }
-
if (buf.size() >= capacity) {
- flusher.accept(buf, deleted);
+ flusher.accept(buf);
buf = new ArrayList<>(capacity);
- deleted = new BitSet(capacity);
}
}
@@ -72,7 +63,7 @@ class StreamerBuffer<T> {
closed = true;
if (!buf.isEmpty()) {
- flusher.accept(buf, deleted);
+ flusher.accept(buf);
}
}
@@ -81,8 +72,11 @@ class StreamerBuffer<T> {
return;
}
- flusher.accept(buf, deleted);
+ flusher.accept(buf);
buf = new ArrayList<>(capacity);
- deleted = new BitSet(capacity);
+ }
+
+ synchronized void forEach(Consumer<T> consumer) {
+ buf.forEach(consumer);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index 14fe51fd7b..5087a76287 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -17,8 +17,13 @@
package org.apache.ignite.internal.streamer;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -32,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.table.DataStreamerException;
import org.jetbrains.annotations.Nullable;
/**
@@ -66,7 +72,7 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
// NOTE: This can accumulate empty buffers for stopped/failed nodes.
Cleaning up is not trivial in concurrent scenario.
// We don't expect thousands of node failures, so it should be fine.
- private final ConcurrentHashMap<P, StreamerBuffer<V>> buffers = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<P, StreamerBuffer<E>> buffers = new
ConcurrentHashMap<>();
private final ConcurrentMap<P, CompletableFuture<Collection<R>>>
pendingRequests = new ConcurrentHashMap<>();
@@ -76,6 +82,8 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
private final ScheduledExecutorService flushExecutor;
+ private final Set<E> failedItems = Collections.synchronizedSet(new
HashSet<>());
+
private @Nullable Flow.Subscription subscription;
private @Nullable ResultSubscription resultSubscription;
@@ -159,20 +167,21 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
/** {@inheritDoc} */
@Override
- public void onNext(E item) {
+ public synchronized void onNext(E item) {
+ if (closed) {
+ throw new IllegalStateException("Streamer is closed, can't add
items.");
+ }
+
pendingItemCount.decrementAndGet();
T key = keyFunc.apply(item);
P partition = partitionAwarenessProvider.partition(key);
- StreamerBuffer<V> buf = buffers.computeIfAbsent(
+ StreamerBuffer<E> buf = buffers.computeIfAbsent(
partition,
- p -> new StreamerBuffer<>(options.pageSize(), (items, deleted)
-> enlistBatch(p, items, deleted)));
+ p -> new StreamerBuffer<>(options.pageSize(), items ->
enlistBatch(p, items)));
- V payload = payloadFunc.apply(item);
- boolean delete = deleteFunc.apply(item);
-
- buf.add(payload, delete);
+ buf.add(item);
this.metrics.streamerItemsQueuedAdd(1);
requestMore();
@@ -199,7 +208,7 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
return completionFut;
}
- private void enlistBatch(P partition, Collection<V> batch, BitSet deleted)
{
+ private void enlistBatch(P partition, List<E> batch) {
int batchSize = batch.size();
assert batchSize > 0 : "Batch size must be positive.";
assert partition != null : "Partition must not be null.";
@@ -210,18 +219,34 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
pendingRequests.compute(
partition,
// Chain existing futures to preserve request order.
- (part, fut) -> fut == null ? sendBatch(part, batch, deleted) :
fut.thenCompose(v -> sendBatch(part, batch, deleted))
+ (part, fut) -> fut == null
+ ? sendBatch(part, batch)
+ : fut.whenComplete((res, err) -> {
+ if (err != null) {
+ failedItems.addAll(batch);
+ }
+ }).thenCompose(v -> sendBatch(part, batch))
);
}
- private CompletableFuture<Collection<R>> sendBatch(P partition,
Collection<V> batch, BitSet deleted) {
+ private CompletableFuture<Collection<R>> sendBatch(P partition, List<E>
batch) {
// If a connection fails, the batch goes to default connection thanks
to built-it retry mechanism.
try {
- return batchSender.sendAsync(partition, batch,
deleted).whenComplete((res, err) -> {
+ var items = new ArrayList<V>();
+ var deleted = new BitSet(batch.size());
+
+ for (E e : batch) {
+ items.add(payloadFunc.apply(e));
+ deleted.set(items.size() - 1, deleteFunc.apply(e));
+ }
+
+ return batchSender.sendAsync(partition, items,
deleted).whenComplete((res, err) -> {
if (err != null) {
// Retry is handled by the sender (RetryPolicy in
ReliableChannel on the client, sendWithRetry on the server).
// If we get here, then retries are exhausted and we
should fail the streamer.
log.error("Failed to send batch to partition " + partition
+ ": " + err.getMessage(), err);
+
+ failedItems.addAll(batch);
close(err);
} else {
int batchSize = batch.size();
@@ -244,8 +269,10 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
invokeResultSubscriber(res);
}
});
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error("Failed to send batch to partition " + partition + ": "
+ e.getMessage(), e);
+
+ failedItems.addAll(batch);
close(e);
return CompletableFuture.failedFuture(e);
}
@@ -276,6 +303,12 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
}
private synchronized void close(@Nullable Throwable throwable) {
+ if (closed) {
+ return;
+ }
+
+ closed = true;
+
if (flushTask != null) {
flushTask.cancel(false);
}
@@ -290,13 +323,9 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
var futs = pendingRequests.values().toArray(new
CompletableFuture[0]);
- CompletableFuture.allOf(futs).whenComplete((v, e) -> {
+ CompletableFuture.allOf(futs).whenCompleteAsync((v, e) -> {
if (e != null) {
- if (resultSubscriber != null) {
- resultSubscriber.onError(e);
- }
-
- completionFut.completeExceptionally(e);
+ completeWithError(e);
} else {
if (resultSubscriber != null) {
resultSubscriber.onComplete();
@@ -304,16 +333,24 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
completionFut.complete(null);
}
- });
+ }, flushExecutor);
} else {
- completionFut.completeExceptionally(throwable);
+ // Collect failed/non-delivered items from failed requests and
pending buffers.
+ var futs = pendingRequests.values().toArray(new
CompletableFuture[0]);
- if (resultSubscriber != null) {
- resultSubscriber.onError(throwable);
- }
+ CompletableFuture.allOf(futs).whenCompleteAsync((v, e) ->
completeWithError(throwable), flushExecutor);
}
+ }
- closed = true;
+ private void completeWithError(Throwable throwable) {
+ buffers.values().forEach(buf -> buf.forEach(failedItems::add));
+ DataStreamerException streamerErr = new
DataStreamerException(failedItems, throwable);
+
+ completionFut.completeExceptionally(streamerErr);
+
+ if (resultSubscriber != null) {
+ resultSubscriber.onError(streamerErr);
+ }
}
private synchronized void requestMore() {
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/streamer/DirectPublisher.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/streamer/DirectPublisher.java
new file mode 100644
index 0000000000..43d6cdb1a3
--- /dev/null
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/streamer/DirectPublisher.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.streamer;
+
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Direct publisher that ignores backpressure and sends items directly to the
subscribers.
+ */
+public class DirectPublisher<T> implements Publisher<T>, Subscription,
AutoCloseable {
+ private final AtomicLong requested = new AtomicLong();
+
+ private Subscriber<? super T> subscriber;
+
+ @Override
+ public void close() {
+ if (subscriber != null) {
+ subscriber.onComplete();
+ }
+ }
+
+ @Override
+ public void subscribe(Subscriber<? super T> subscriber) {
+ assert this.subscriber == null : "Only one subscriber is supported";
+ this.subscriber = subscriber;
+ subscriber.onSubscribe(this);
+ }
+
+ @Override
+ public void request(long n) {
+ requested.addAndGet(n);
+ }
+
+ @Override
+ public void cancel() {
+ // No-op.
+ }
+
+ public void submit(T item) {
+ subscriber.onNext(item);
+ }
+
+ public long requested() {
+ return requested.get();
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index bcb0827e62..c394ba9db5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -21,6 +21,9 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.endsWith;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -28,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -45,8 +49,11 @@ import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.DataStreamerException;
import org.apache.ignite.table.DataStreamerItem;
+import org.apache.ignite.table.DataStreamerOperationType;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.DataStreamerReceiver;
import org.apache.ignite.table.DataStreamerReceiverContext;
@@ -218,19 +225,22 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
public void testMissingKeyColumn() {
RecordView<Tuple> view = this.defaultTable().recordView();
+ DataStreamerItem<Tuple> item = DataStreamerItem.of(Tuple.create());
CompletableFuture<Void> streamerFut;
- try (var publisher = new SimplePublisher<Tuple>()) {
+ try (var publisher = new
SubmissionPublisher<DataStreamerItem<Tuple>>()) {
var options = DataStreamerOptions.builder().build();
streamerFut = view.streamData(publisher, options);
- var tuple = Tuple.create();
-
- publisher.submit(tuple);
+ publisher.submit(item);
}
var ex = assertThrows(CompletionException.class, () ->
streamerFut.orTimeout(1, TimeUnit.SECONDS).join());
- assertEquals("Missed key column: ID", ex.getCause().getMessage());
+ assertThat(ex.getMessage(), endsWith("Missed key column: ID"));
+
+ DataStreamerException cause = (DataStreamerException) ex.getCause();
+ assertEquals(1, cause.failedItems().size());
+ assertEquals(item, cause.failedItems().iterator().next());
}
@SuppressWarnings("Convert2MethodRef")
@@ -465,28 +475,87 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
public void testReceiverException(boolean async) {
CompletableFuture<Void> streamerFut;
+ Object key = 0;
+ Tuple item = tupleKey(1);
+
try (var publisher = new SubmissionPublisher<Tuple>()) {
streamerFut = defaultTable().recordView().streamData(
publisher,
t -> t,
- t -> 0,
+ t -> key,
ReceiverDescriptor.builder(TestReceiver.class).build(),
null,
DataStreamerOptions.builder().retryLimit(0).pageSize(1).build(),
async ? "throw-async" : "throw");
- publisher.submit(tupleKey(1));
+ publisher.submit(item);
}
var ex = assertThrows(CompletionException.class, () ->
streamerFut.orTimeout(1, TimeUnit.SECONDS).join());
- assertEquals(
- "Streamer receiver failed: Job execution failed:
java.lang.ArithmeticException: test",
- ex.getCause().getMessage());
+ assertThat(
+ ex.getCause().getMessage(),
+ endsWith("Streamer receiver failed: Job execution failed:
java.lang.ArithmeticException: test"));
+
+ DataStreamerException cause = (DataStreamerException) ex.getCause();
+ assertEquals(1, cause.failedItems().size());
+ assertEquals(item, cause.failedItems().iterator().next());
+ }
+
+ @Test
+ public void testFailedItems() {
+ RecordView<Tuple> view = defaultTable().recordView();
+
+ CompletableFuture<Void> streamerFut;
+
+ var invalidItemsAdded = new ArrayList<DataStreamerItem<Tuple>>();
+
+ try (var publisher = new DirectPublisher<DataStreamerItem<Tuple>>()) {
+ var options = DataStreamerOptions.builder()
+ .pageSize(10)
+ .perPartitionParallelOperations(3)
+ .autoFlushInterval(100)
+ .build();
+
+ streamerFut = view.streamData(publisher, options);
+
+ TestUtils.waitForCondition(() -> publisher.requested() > 0, 5000);
+
+ // Submit valid items.
+ for (int i = 0; i < 100; i++) {
+ publisher.submit(DataStreamerItem.of(tuple(i, "foo-" + i)));
+ }
+
+ TestUtils.waitForCondition(() -> view.contains(null,
tupleKey(99)), 5000);
+
+ // Submit invalid items.
+ for (int i = 200; i < 300; i++) {
+ DataStreamerItem<Tuple> item = DataStreamerItem.of(
+ Tuple.create().set("id", i).set("name1", "bar-" + i),
+ i % 2 == 0 ? DataStreamerOperationType.PUT :
DataStreamerOperationType.REMOVE);
+
+ try {
+ publisher.submit(item);
+ invalidItemsAdded.add(item);
+ } catch (Exception e) {
+ break;
+ }
+ }
+ }
+
+ var ex = assertThrows(CompletionException.class, () ->
streamerFut.orTimeout(1, TimeUnit.SECONDS).join());
+ DataStreamerException cause = (DataStreamerException) ex.getCause();
+ Set<?> failedItems = cause.failedItems();
+
+ assertThat(invalidItemsAdded.size(), is(greaterThan(10)));
+ assertEquals(invalidItemsAdded.size(), failedItems.size());
+
+ for (DataStreamerItem<Tuple> item : invalidItemsAdded) {
+ assertTrue(failedItems.contains(item), "Failed item not found: " +
item.get());
+ }
}
private void waitForKey(RecordView<Tuple> view, Tuple key) throws
InterruptedException {
assertTrue(waitForCondition(() -> {
- @SuppressWarnings("resource")
var tx = ignite().transactions().begin(new
TransactionOptions().readOnly(true));
try {
@@ -548,7 +617,6 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
}
}
- @SuppressWarnings("resource")
private static class TestReceiver implements DataStreamerReceiver<String,
Object, String> {
@Override
public CompletableFuture<List<String>> receive(List<String> page,
DataStreamerReceiverContext ctx, Object arg) {
@@ -573,7 +641,6 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
}
}
- @SuppressWarnings("resource")
private static class NodeNameReceiver implements
DataStreamerReceiver<Integer, Object, Void> {
@Override
public @Nullable CompletableFuture<List<Void>> receive(List<Integer>
page, DataStreamerReceiverContext ctx, Object arg) {
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItNotNullConstraintTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItNotNullConstraintTest.java
index 072abe1d24..278cff9a2d 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItNotNullConstraintTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItNotNullConstraintTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -256,7 +257,7 @@ public abstract class ItNotNullConstraintTest extends
ClusterPerClassIntegration
publisher.submit(DataStreamerItem.of(item));
}
- assertThrows(MarshallerException.class, () -> await(streamerFut),
error);
+ assertThrowsWithCause(() -> await(streamerFut),
MarshallerException.class, error);
}
private static <R> void checkDataStreamer(RecordView<R> view, R item,
String error) {
@@ -267,7 +268,7 @@ public abstract class ItNotNullConstraintTest extends
ClusterPerClassIntegration
publisher.submit(DataStreamerItem.of(item));
}
- assertThrows(MarshallerException.class, () -> await(streamerFut),
error);
+ assertThrowsWithCause(() -> await(streamerFut),
MarshallerException.class, error);
}
static class Val {