This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 67b3167075 IGNITE-23621 Data Streamer: report failed entries on error 
(#4691)
67b3167075 is described below

commit 67b3167075af3f08a1a6a3153643bb0e1bf5bec4
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Nov 8 17:27:48 2024 +0200

    IGNITE-23621 Data Streamer: report failed entries on error (#4691)
    
    * Save all items that were not successfully sent to the cluster and return 
them in DataStreamerException#failedItems
    * Refuse items when closed (normally or due to an exception)
---
 .../apache/ignite/table/DataStreamerException.java | 84 +++++++++++++++++++
 ...ctClientStreamerPartitionAwarenessProvider.java | 13 +--
 .../org/apache/ignite/client/DataStreamerTest.java | 14 +++-
 .../ignite/internal/streamer/StreamerBuffer.java   | 28 +++----
 .../internal/streamer/StreamerSubscriber.java      | 87 ++++++++++++++------
 .../ignite/internal/streamer/DirectPublisher.java  | 64 +++++++++++++++
 .../streamer/ItAbstractDataStreamerTest.java       | 93 +++++++++++++++++++---
 .../sql/engine/ItNotNullConstraintTest.java        |  5 +-
 8 files changed, 322 insertions(+), 66 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerException.java 
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerException.java
new file mode 100644
index 0000000000..93fd3ec12c
--- /dev/null
+++ 
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerException.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table;
+
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents an exception that is thrown during data streaming. Includes 
information about failed items.
+ */
+public final class DataStreamerException extends IgniteException {
+    private static final long serialVersionUID = -7516813059448101081L;
+
+    private final Set<?> failedItems;
+
+    /**
+     * Constructor.
+     *
+     * @param failedItems Set of failed items.
+     * @param cause Cause of the exception.
+     */
+    public DataStreamerException(Set<?> failedItems, Throwable cause) {
+        super(getCode(cause), cause.getMessage(), cause);
+
+        this.failedItems = failedItems;
+    }
+
+    /**
+     * Creates an exception with the given trace ID, error code, detailed 
message, and cause.
+     *
+     * @param traceId Unique identifier of the exception.
+     * @param code Full error code.
+     * @param message Detailed message.
+     * @param cause Optional nested exception (can be {@code null}).
+     */
+    public DataStreamerException(UUID traceId, int code, String message, 
@Nullable Throwable cause) {
+        super(traceId, code, message, cause);
+        this.failedItems = Set.of();
+    }
+
+    /**
+     * Gets the set of items that were not streamed to the cluster.
+     *
+     * @return Set of failed items.
+     */
+    public Set<?> failedItems() {
+        return failedItems;
+    }
+
+    private static int getCode(Throwable cause) {
+        if (cause instanceof IgniteException) {
+            IgniteException e = (IgniteException) cause;
+
+            return e.code();
+        }
+
+        if (cause.getCause() instanceof IgniteException) {
+            IgniteException e = (IgniteException) cause.getCause();
+
+            return e.code();
+        }
+
+        return INTERNAL_ERR;
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientStreamerPartitionAwarenessProvider.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientStreamerPartitionAwarenessProvider.java
index 0cd37f657e..b79616c0ef 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientStreamerPartitionAwarenessProvider.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientStreamerPartitionAwarenessProvider.java
@@ -27,8 +27,8 @@ import 
org.apache.ignite.internal.streamer.StreamerPartitionAwarenessProvider;
  */
 abstract class AbstractClientStreamerPartitionAwarenessProvider<T> implements 
StreamerPartitionAwarenessProvider<T, Integer> {
     private final ClientTable tbl;
-    private int partitions = -1;
-    private ClientSchema schema;
+    private volatile int partitions = -1;
+    private volatile ClientSchema schema;
 
     AbstractClientStreamerPartitionAwarenessProvider(ClientTable tbl) {
         this.tbl = tbl;
@@ -36,12 +36,15 @@ abstract class 
AbstractClientStreamerPartitionAwarenessProvider<T> implements St
 
     @Override
     public Integer partition(T item) {
-        if (schema == null || partitions < 0) {
+        ClientSchema schema0 = schema;
+        int partitions0 = partitions;
+
+        if (schema0 == null || partitions0 < 0) {
             throw new 
IllegalStateException("StreamerPartitionAwarenessProvider.refresh() was not 
called or awaited.");
         }
 
-        int hash = colocationHash(schema, item);
-        return Math.abs(hash % partitions);
+        int hash = colocationHash(schema0, item);
+        return Math.abs(hash % partitions0);
     }
 
     abstract int colocationHash(ClientSchema schema, T item);
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
index a4813fb9f0..bca9099035 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.client;
 
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -61,6 +61,7 @@ import org.apache.ignite.client.IgniteClient.Builder;
 import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.client.fakes.FakeIgniteTables;
 import org.apache.ignite.internal.streamer.SimplePublisher;
+import org.apache.ignite.table.DataStreamerException;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.DataStreamerReceiver;
@@ -627,10 +628,15 @@ public class DataStreamerTest extends 
AbstractClientTableTest {
             }
         }
 
-        assertThat(streamerFut, willThrow(ArithmeticException.class, "Result 
subscriber exception"));
+        assertThat(streamerFut, 
willThrowWithCauseOrSuppressed(ArithmeticException.class, "Result subscriber 
exception"));
         assertFalse(resultSubscriber.completed.get());
-        assertInstanceOf(CompletionException.class, 
resultSubscriber.error.get());
-        assertInstanceOf(ArithmeticException.class, 
resultSubscriber.error.get().getCause());
+
+        Throwable subscriberErr = resultSubscriber.error.get();
+        assertInstanceOf(DataStreamerException.class, subscriberErr);
+        assertInstanceOf(CompletionException.class, subscriberErr.getCause());
+        assertInstanceOf(ArithmeticException.class, 
subscriberErr.getCause().getCause());
+
+        assertEquals(3, ((DataStreamerException) 
subscriberErr).failedItems().size());
     }
 
     @Test
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
index b79c8e7f75..20d9cb1cd7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
@@ -18,27 +18,23 @@
 package org.apache.ignite.internal.streamer;
 
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.List;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 class StreamerBuffer<T> {
     private final int capacity;
 
-    private final BiConsumer<List<T>, BitSet> flusher;
+    private final Consumer<List<T>> flusher;
 
     /** Primary buffer. Won't grow over capacity. */
     private List<T> buf;
 
-    private BitSet deleted;
-
     private boolean closed;
 
-    StreamerBuffer(int capacity, BiConsumer<List<T>, BitSet> flusher) {
+    StreamerBuffer(int capacity, Consumer<List<T>> flusher) {
         this.capacity = capacity;
         this.flusher = flusher;
         buf = new ArrayList<>(capacity);
-        deleted = new BitSet(capacity);
     }
 
     /**
@@ -46,21 +42,16 @@ class StreamerBuffer<T> {
      *
      * @param item Item.
      */
-    synchronized void add(T item, boolean delete) {
+    synchronized void add(T item) {
         if (closed) {
             throw new IllegalStateException("Streamer is closed, can't add 
items.");
         }
 
         buf.add(item);
 
-        if (delete) {
-            deleted.set(buf.size() - 1);
-        }
-
         if (buf.size() >= capacity) {
-            flusher.accept(buf, deleted);
+            flusher.accept(buf);
             buf = new ArrayList<>(capacity);
-            deleted = new BitSet(capacity);
         }
     }
 
@@ -72,7 +63,7 @@ class StreamerBuffer<T> {
         closed = true;
 
         if (!buf.isEmpty()) {
-            flusher.accept(buf, deleted);
+            flusher.accept(buf);
         }
     }
 
@@ -81,8 +72,11 @@ class StreamerBuffer<T> {
             return;
         }
 
-        flusher.accept(buf, deleted);
+        flusher.accept(buf);
         buf = new ArrayList<>(capacity);
-        deleted = new BitSet(capacity);
+    }
+
+    synchronized void forEach(Consumer<T> consumer) {
+        buf.forEach(consumer);
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index 14fe51fd7b..5087a76287 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -17,8 +17,13 @@
 
 package org.apache.ignite.internal.streamer;
 
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -32,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.table.DataStreamerException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -66,7 +72,7 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
 
     // NOTE: This can accumulate empty buffers for stopped/failed nodes. 
Cleaning up is not trivial in concurrent scenario.
     // We don't expect thousands of node failures, so it should be fine.
-    private final ConcurrentHashMap<P, StreamerBuffer<V>> buffers = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<P, StreamerBuffer<E>> buffers = new 
ConcurrentHashMap<>();
 
     private final ConcurrentMap<P, CompletableFuture<Collection<R>>> 
pendingRequests = new ConcurrentHashMap<>();
 
@@ -76,6 +82,8 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
 
     private final ScheduledExecutorService flushExecutor;
 
+    private final Set<E> failedItems = Collections.synchronizedSet(new 
HashSet<>());
+
     private @Nullable Flow.Subscription subscription;
 
     private @Nullable ResultSubscription resultSubscription;
@@ -159,20 +167,21 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
 
     /** {@inheritDoc} */
     @Override
-    public void onNext(E item) {
+    public synchronized void onNext(E item) {
+        if (closed) {
+            throw new IllegalStateException("Streamer is closed, can't add 
items.");
+        }
+
         pendingItemCount.decrementAndGet();
 
         T key = keyFunc.apply(item);
         P partition = partitionAwarenessProvider.partition(key);
 
-        StreamerBuffer<V> buf = buffers.computeIfAbsent(
+        StreamerBuffer<E> buf = buffers.computeIfAbsent(
                 partition,
-                p -> new StreamerBuffer<>(options.pageSize(), (items, deleted) 
-> enlistBatch(p, items, deleted)));
+                p -> new StreamerBuffer<>(options.pageSize(), items -> 
enlistBatch(p, items)));
 
-        V payload = payloadFunc.apply(item);
-        boolean delete = deleteFunc.apply(item);
-
-        buf.add(payload, delete);
+        buf.add(item);
         this.metrics.streamerItemsQueuedAdd(1);
 
         requestMore();
@@ -199,7 +208,7 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
         return completionFut;
     }
 
-    private void enlistBatch(P partition, Collection<V> batch, BitSet deleted) 
{
+    private void enlistBatch(P partition, List<E> batch) {
         int batchSize = batch.size();
         assert batchSize > 0 : "Batch size must be positive.";
         assert partition != null : "Partition must not be null.";
@@ -210,18 +219,34 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
         pendingRequests.compute(
                 partition,
                 // Chain existing futures to preserve request order.
-                (part, fut) -> fut == null ? sendBatch(part, batch, deleted) : 
fut.thenCompose(v -> sendBatch(part, batch, deleted))
+                (part, fut) -> fut == null
+                        ? sendBatch(part, batch)
+                        : fut.whenComplete((res, err) -> {
+                            if (err != null) {
+                                failedItems.addAll(batch);
+                            }
+                        }).thenCompose(v -> sendBatch(part, batch))
         );
     }
 
-    private CompletableFuture<Collection<R>> sendBatch(P partition, 
Collection<V> batch, BitSet deleted) {
+    private CompletableFuture<Collection<R>> sendBatch(P partition, List<E> 
batch) {
         // If a connection fails, the batch goes to default connection thanks 
to built-it retry mechanism.
         try {
-            return batchSender.sendAsync(partition, batch, 
deleted).whenComplete((res, err) -> {
+            var items = new ArrayList<V>();
+            var deleted = new BitSet(batch.size());
+
+            for (E e : batch) {
+                items.add(payloadFunc.apply(e));
+                deleted.set(items.size() - 1, deleteFunc.apply(e));
+            }
+
+            return batchSender.sendAsync(partition, items, 
deleted).whenComplete((res, err) -> {
                 if (err != null) {
                     // Retry is handled by the sender (RetryPolicy in 
ReliableChannel on the client, sendWithRetry on the server).
                     // If we get here, then retries are exhausted and we 
should fail the streamer.
                     log.error("Failed to send batch to partition " + partition 
+ ": " + err.getMessage(), err);
+
+                    failedItems.addAll(batch);
                     close(err);
                 } else {
                     int batchSize = batch.size();
@@ -244,8 +269,10 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
                     invokeResultSubscriber(res);
                 }
             });
-        } catch (Exception e) {
+        } catch (Throwable e) {
             log.error("Failed to send batch to partition " + partition + ": " 
+ e.getMessage(), e);
+
+            failedItems.addAll(batch);
             close(e);
             return CompletableFuture.failedFuture(e);
         }
@@ -276,6 +303,12 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
     }
 
     private synchronized void close(@Nullable Throwable throwable) {
+        if (closed) {
+            return;
+        }
+
+        closed = true;
+
         if (flushTask != null) {
             flushTask.cancel(false);
         }
@@ -290,13 +323,9 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
 
             var futs = pendingRequests.values().toArray(new 
CompletableFuture[0]);
 
-            CompletableFuture.allOf(futs).whenComplete((v, e) -> {
+            CompletableFuture.allOf(futs).whenCompleteAsync((v, e) -> {
                 if (e != null) {
-                    if (resultSubscriber != null) {
-                        resultSubscriber.onError(e);
-                    }
-
-                    completionFut.completeExceptionally(e);
+                    completeWithError(e);
                 } else {
                     if (resultSubscriber != null) {
                         resultSubscriber.onComplete();
@@ -304,16 +333,24 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
 
                     completionFut.complete(null);
                 }
-            });
+            }, flushExecutor);
         } else {
-            completionFut.completeExceptionally(throwable);
+            // Collect failed/non-delivered items from failed requests and 
pending buffers.
+            var futs = pendingRequests.values().toArray(new 
CompletableFuture[0]);
 
-            if (resultSubscriber != null) {
-                resultSubscriber.onError(throwable);
-            }
+            CompletableFuture.allOf(futs).whenCompleteAsync((v, e) -> 
completeWithError(throwable), flushExecutor);
         }
+    }
 
-        closed = true;
+    private void completeWithError(Throwable throwable) {
+        buffers.values().forEach(buf -> buf.forEach(failedItems::add));
+        DataStreamerException streamerErr = new 
DataStreamerException(failedItems, throwable);
+
+        completionFut.completeExceptionally(streamerErr);
+
+        if (resultSubscriber != null) {
+            resultSubscriber.onError(streamerErr);
+        }
     }
 
     private synchronized void requestMore() {
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/streamer/DirectPublisher.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/streamer/DirectPublisher.java
new file mode 100644
index 0000000000..43d6cdb1a3
--- /dev/null
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/streamer/DirectPublisher.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.streamer;
+
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Direct publisher that ignores backpressure and sends items directly to the 
subscribers.
+ */
+public class DirectPublisher<T> implements Publisher<T>, Subscription, 
AutoCloseable {
+    private final AtomicLong requested = new AtomicLong();
+
+    private Subscriber<? super T> subscriber;
+
+    @Override
+    public void close() {
+        if (subscriber != null) {
+            subscriber.onComplete();
+        }
+    }
+
+    @Override
+    public void subscribe(Subscriber<? super T> subscriber) {
+        assert this.subscriber == null : "Only one subscriber is supported";
+        this.subscriber = subscriber;
+        subscriber.onSubscribe(this);
+    }
+
+    @Override
+    public void request(long n) {
+        requested.addAndGet(n);
+    }
+
+    @Override
+    public void cancel() {
+        // No-op.
+    }
+
+    public void submit(T item) {
+        subscriber.onNext(item);
+    }
+
+    public long requested() {
+        return requested.get();
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index bcb0827e62..c394ba9db5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -21,6 +21,9 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.endsWith;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -28,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -45,8 +49,11 @@ import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.DataStreamerException;
 import org.apache.ignite.table.DataStreamerItem;
+import org.apache.ignite.table.DataStreamerOperationType;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.DataStreamerReceiver;
 import org.apache.ignite.table.DataStreamerReceiverContext;
@@ -218,19 +225,22 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
     public void testMissingKeyColumn() {
         RecordView<Tuple> view = this.defaultTable().recordView();
 
+        DataStreamerItem<Tuple> item = DataStreamerItem.of(Tuple.create());
         CompletableFuture<Void> streamerFut;
 
-        try (var publisher = new SimplePublisher<Tuple>()) {
+        try (var publisher = new 
SubmissionPublisher<DataStreamerItem<Tuple>>()) {
             var options = DataStreamerOptions.builder().build();
             streamerFut = view.streamData(publisher, options);
 
-            var tuple = Tuple.create();
-
-            publisher.submit(tuple);
+            publisher.submit(item);
         }
 
         var ex = assertThrows(CompletionException.class, () -> 
streamerFut.orTimeout(1, TimeUnit.SECONDS).join());
-        assertEquals("Missed key column: ID", ex.getCause().getMessage());
+        assertThat(ex.getMessage(), endsWith("Missed key column: ID"));
+
+        DataStreamerException cause = (DataStreamerException) ex.getCause();
+        assertEquals(1, cause.failedItems().size());
+        assertEquals(item, cause.failedItems().iterator().next());
     }
 
     @SuppressWarnings("Convert2MethodRef")
@@ -465,28 +475,87 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
     public void testReceiverException(boolean async) {
         CompletableFuture<Void> streamerFut;
 
+        Object key = 0;
+        Tuple item = tupleKey(1);
+
         try (var publisher = new SubmissionPublisher<Tuple>()) {
             streamerFut = defaultTable().recordView().streamData(
                     publisher,
                     t -> t,
-                    t -> 0,
+                    t -> key,
                     ReceiverDescriptor.builder(TestReceiver.class).build(),
                     null,
                     
DataStreamerOptions.builder().retryLimit(0).pageSize(1).build(),
                     async ? "throw-async" : "throw");
 
-            publisher.submit(tupleKey(1));
+            publisher.submit(item);
         }
 
         var ex = assertThrows(CompletionException.class, () -> 
streamerFut.orTimeout(1, TimeUnit.SECONDS).join());
-        assertEquals(
-                "Streamer receiver failed: Job execution failed: 
java.lang.ArithmeticException: test",
-                ex.getCause().getMessage());
+        assertThat(
+                ex.getCause().getMessage(),
+                endsWith("Streamer receiver failed: Job execution failed: 
java.lang.ArithmeticException: test"));
+
+        DataStreamerException cause = (DataStreamerException) ex.getCause();
+        assertEquals(1, cause.failedItems().size());
+        assertEquals(item, cause.failedItems().iterator().next());
+    }
+
+    @Test
+    public void testFailedItems() {
+        RecordView<Tuple> view = defaultTable().recordView();
+
+        CompletableFuture<Void> streamerFut;
+
+        var invalidItemsAdded = new ArrayList<DataStreamerItem<Tuple>>();
+
+        try (var publisher = new DirectPublisher<DataStreamerItem<Tuple>>()) {
+            var options = DataStreamerOptions.builder()
+                    .pageSize(10)
+                    .perPartitionParallelOperations(3)
+                    .autoFlushInterval(100)
+                    .build();
+
+            streamerFut = view.streamData(publisher, options);
+
+            TestUtils.waitForCondition(() -> publisher.requested() > 0, 5000);
+
+            // Submit valid items.
+            for (int i = 0; i < 100; i++) {
+                publisher.submit(DataStreamerItem.of(tuple(i, "foo-" + i)));
+            }
+
+            TestUtils.waitForCondition(() -> view.contains(null, 
tupleKey(99)), 5000);
+
+            // Submit invalid items.
+            for (int i = 200; i < 300; i++) {
+                DataStreamerItem<Tuple> item = DataStreamerItem.of(
+                        Tuple.create().set("id", i).set("name1", "bar-" + i),
+                        i % 2 == 0 ? DataStreamerOperationType.PUT : 
DataStreamerOperationType.REMOVE);
+
+                try {
+                    publisher.submit(item);
+                    invalidItemsAdded.add(item);
+                } catch (Exception e) {
+                    break;
+                }
+            }
+        }
+
+        var ex = assertThrows(CompletionException.class, () -> 
streamerFut.orTimeout(1, TimeUnit.SECONDS).join());
+        DataStreamerException cause = (DataStreamerException) ex.getCause();
+        Set<?> failedItems = cause.failedItems();
+
+        assertThat(invalidItemsAdded.size(), is(greaterThan(10)));
+        assertEquals(invalidItemsAdded.size(), failedItems.size());
+
+        for (DataStreamerItem<Tuple> item : invalidItemsAdded) {
+            assertTrue(failedItems.contains(item), "Failed item not found: " + 
item.get());
+        }
     }
 
     private void waitForKey(RecordView<Tuple> view, Tuple key) throws 
InterruptedException {
         assertTrue(waitForCondition(() -> {
-            @SuppressWarnings("resource")
             var tx = ignite().transactions().begin(new 
TransactionOptions().readOnly(true));
 
             try {
@@ -548,7 +617,6 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
         }
     }
 
-    @SuppressWarnings("resource")
     private static class TestReceiver implements DataStreamerReceiver<String, 
Object, String> {
         @Override
         public CompletableFuture<List<String>> receive(List<String> page, 
DataStreamerReceiverContext ctx, Object arg) {
@@ -573,7 +641,6 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
         }
     }
 
-    @SuppressWarnings("resource")
     private static class NodeNameReceiver implements 
DataStreamerReceiver<Integer, Object, Void> {
         @Override
         public @Nullable CompletableFuture<List<Void>> receive(List<Integer> 
page, DataStreamerReceiverContext ctx, Object arg) {
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItNotNullConstraintTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItNotNullConstraintTest.java
index 072abe1d24..278cff9a2d 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItNotNullConstraintTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItNotNullConstraintTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine;
 
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
@@ -256,7 +257,7 @@ public abstract class ItNotNullConstraintTest extends 
ClusterPerClassIntegration
             publisher.submit(DataStreamerItem.of(item));
         }
 
-        assertThrows(MarshallerException.class, () -> await(streamerFut), 
error);
+        assertThrowsWithCause(() -> await(streamerFut), 
MarshallerException.class, error);
     }
 
     private static <R> void checkDataStreamer(RecordView<R> view, R item, 
String error) {
@@ -267,7 +268,7 @@ public abstract class ItNotNullConstraintTest extends 
ClusterPerClassIntegration
             publisher.submit(DataStreamerItem.of(item));
         }
 
-        assertThrows(MarshallerException.class, () -> await(streamerFut), 
error);
+        assertThrowsWithCause(() -> await(streamerFut), 
MarshallerException.class, error);
     }
 
     static class Val {

Reply via email to