This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a1fe0da528 IGNITE-20181: KV/Binary view public API should only throw
public exceptions (#2523)
a1fe0da528 is described below
commit a1fe0da528ab930ad806056258a3e2194395998b
Author: Max Zhuravkov <[email protected]>
AuthorDate: Fri Sep 8 13:30:38 2023 +0300
IGNITE-20181: KV/Binary view public API should only throw public exceptions
(#2523)
---
.../apache/ignite/lang/MarshallerException.java | 18 +++++-
.../ignite/lang/UnexpectedNullValueException.java | 3 +-
.../ignite/internal/IgniteExceptionArchTest.java | 2 -
.../Table/SchemaValidationTest.cs | 16 +++---
.../ignite/internal/table/AbstractTableView.java | 34 ++++-------
.../internal/table/KeyValueBinaryViewImpl.java | 46 ++++++++-------
...aluePojoStreamerPartitionAwarenessProvider.java | 2 +-
.../ignite/internal/table/KeyValueViewImpl.java | 67 ++++++++++++----------
.../internal/table/RecordBinaryViewImpl.java | 49 +++++++++-------
.../ignite/internal/table/RecordViewImpl.java | 59 ++++++++++---------
.../internal/table/TableViewRowConverter.java | 66 +++++++++++++++++++++
11 files changed, 226 insertions(+), 136 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/MarshallerException.java
b/modules/api/src/main/java/org/apache/ignite/lang/MarshallerException.java
index 5a10d26718..98f1d7d41b 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/MarshallerException.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/MarshallerException.java
@@ -17,6 +17,10 @@
package org.apache.ignite.lang;
+import java.util.UUID;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.jetbrains.annotations.Nullable;
+
/**
* This exception is caused by a failure to marshall or unmarshall a value.
* The failure can be due to a value not matching the a schema or to another
reason.
@@ -28,6 +32,18 @@ public class MarshallerException extends IgniteException {
* @param cause Non-null throwable cause.
*/
public MarshallerException(Throwable cause) {
- super(cause);
+ super(Common.INTERNAL_ERR, cause);
+ }
+
+ /**
+ * Creates an exception with the given trace ID, error code, detailed
message, and cause.
+ *
+ * @param traceId Unique identifier of the exception.
+ * @param code Full error code.
+ * @param message Detailed message.
+ * @param cause Optional nested exception (can be {@code null}).
+ */
+ public MarshallerException(UUID traceId, int code, String message,
@Nullable Throwable cause) {
+ super(traceId, code, message, cause);
}
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/UnexpectedNullValueException.java
b/modules/api/src/main/java/org/apache/ignite/lang/UnexpectedNullValueException.java
index 4a5acdd428..4caf9697b7 100644
---
a/modules/api/src/main/java/org/apache/ignite/lang/UnexpectedNullValueException.java
+++
b/modules/api/src/main/java/org/apache/ignite/lang/UnexpectedNullValueException.java
@@ -18,6 +18,7 @@
package org.apache.ignite.lang;
import java.util.UUID;
+import org.apache.ignite.lang.ErrorGroups.Common;
/**
* This exception is thrown instead of returning a null value from a method
that does not respect {@code null}-value to avoid ambiguity
@@ -30,7 +31,7 @@ public class UnexpectedNullValueException extends
IgniteException {
* @param msg Message.
*/
public UnexpectedNullValueException(String msg) {
- super("Got unexpected null value: " + msg);
+ super(Common.INTERNAL_ERR, msg);
}
/**
diff --git
a/modules/arch-test/src/test/java/org/apache/ignite/internal/IgniteExceptionArchTest.java
b/modules/arch-test/src/test/java/org/apache/ignite/internal/IgniteExceptionArchTest.java
index f70ee7cd5a..cb94459420 100644
---
a/modules/arch-test/src/test/java/org/apache/ignite/internal/IgniteExceptionArchTest.java
+++
b/modules/arch-test/src/test/java/org/apache/ignite/internal/IgniteExceptionArchTest.java
@@ -39,7 +39,6 @@ import
org.apache.ignite.client.IgniteClientFeatureNotSupportedByServerException
import org.apache.ignite.lang.IgniteCheckedException;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.LocationProvider.RootLocationProvider;
-import org.apache.ignite.lang.MarshallerException;
import org.apache.ignite.network.UnresolvableConsistentIdException;
import org.apache.ignite.security.AuthenticationException;
import org.apache.ignite.security.UnknownAuthenticationTypeException;
@@ -86,7 +85,6 @@ public class IgniteExceptionArchTest {
exclusions.add(IgniteClientAuthenticationException.class.getCanonicalName());
exclusions.add(IgniteClientConnectionException.class.getCanonicalName());
exclusions.add(IgniteClientFeatureNotSupportedByServerException.class.getCanonicalName());
- exclusions.add(MarshallerException.class.getCanonicalName());
exclusions.add(UnresolvableConsistentIdException.class.getCanonicalName());
exclusions.add(AuthenticationException.class.getCanonicalName());
exclusions.add(UnknownAuthenticationTypeException.class.getCanonicalName());
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaValidationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaValidationTest.cs
index 98f2d3e3ee..a512f7264c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaValidationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaValidationTest.cs
@@ -106,7 +106,7 @@ public class SchemaValidationTest : IgniteTestsBase
[ValCol] = "v"
};
- var ex = Assert.ThrowsAsync<IgniteException>(async () => await
TupleView.UpsertAsync(null, igniteTuple));
+ var ex = Assert.ThrowsAsync<MarshallerException>(async () => await
TupleView.UpsertAsync(null, igniteTuple));
Assert.AreEqual("Missed key column: KEY", ex!.Message);
}
@@ -118,7 +118,7 @@ public class SchemaValidationTest : IgniteTestsBase
[KeyCol] = 1L
};
- var ex = Assert.ThrowsAsync<IgniteException>(async () => await
TableRequiredVal.RecordBinaryView.UpsertAsync(null, igniteTuple));
+ var ex = Assert.ThrowsAsync<MarshallerException>(async () => await
TableRequiredVal.RecordBinaryView.UpsertAsync(null, igniteTuple));
StringAssert.StartsWith("Failed to set column (null was passed, but
column is not null", ex!.Message);
StringAssert.Contains("name=VAL", ex.Message);
}
@@ -133,7 +133,7 @@ public class SchemaValidationTest : IgniteTestsBase
[ValCol] = "v"
};
- var ex = Assert.ThrowsAsync<IgniteException>(async () => await
Table.KeyValueBinaryView.PutAsync(null, keyTuple, valTuple));
+ var ex = Assert.ThrowsAsync<MarshallerException>(async () => await
Table.KeyValueBinaryView.PutAsync(null, keyTuple, valTuple));
Assert.AreEqual("Missed key column: KEY", ex!.Message);
}
@@ -147,7 +147,7 @@ public class SchemaValidationTest : IgniteTestsBase
var valTuple = new IgniteTuple();
- var ex = Assert.ThrowsAsync<IgniteException>(
+ var ex = Assert.ThrowsAsync<MarshallerException>(
async () => await
TableRequiredVal.KeyValueBinaryView.PutAsync(null, keyTuple, valTuple));
StringAssert.StartsWith("Failed to set column (null was passed, but
column is not null", ex!.Message);
StringAssert.Contains("name=VAL", ex.Message);
@@ -248,7 +248,7 @@ public class SchemaValidationTest : IgniteTestsBase
[Test]
public void TestMissingKeyPocoFields()
{
- var ex = Assert.ThrowsAsync<IgniteException>(async () => await
Table.GetRecordView<ValPoco>().UpsertAsync(null, new ValPoco()));
+ var ex = Assert.ThrowsAsync<MarshallerException>(async () => await
Table.GetRecordView<ValPoco>().UpsertAsync(null, new ValPoco()));
Assert.AreEqual("Missed key column: KEY", ex!.Message);
}
@@ -256,7 +256,7 @@ public class SchemaValidationTest : IgniteTestsBase
[Test]
public void TestMissingValPocoFields()
{
- var ex = Assert.ThrowsAsync<IgniteException>(
+ var ex = Assert.ThrowsAsync<MarshallerException>(
async () => await
TableRequiredVal.GetRecordView<KeyPoco>().UpsertAsync(null, new KeyPoco()));
StringAssert.StartsWith("Failed to set column (null was passed, but
column is not null", ex!.Message);
@@ -266,7 +266,7 @@ public class SchemaValidationTest : IgniteTestsBase
[Test]
public void TestKvMissingKeyPocoFields()
{
- var ex = Assert.ThrowsAsync<IgniteException>(
+ var ex = Assert.ThrowsAsync<MarshallerException>(
async () => await Table.GetKeyValueView<ValPoco,
string>().PutAsync(null, new ValPoco(), "x"));
Assert.AreEqual("Missed key column: KEY", ex!.Message);
@@ -275,7 +275,7 @@ public class SchemaValidationTest : IgniteTestsBase
[Test]
public void TestKvMissingValPocoFields()
{
- var ex = Assert.ThrowsAsync<IgniteException>(
+ var ex = Assert.ThrowsAsync<MarshallerException>(
async () => await TableRequiredVal.GetKeyValueView<long,
KeyPoco>().PutAsync(null, 1L, new KeyPoco()));
StringAssert.StartsWith("Failed to set column (null was passed, but
column is not null", ex!.Message);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 056ec46767..bfca56c948 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.table;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.IgniteExceptionMapperUtil;
/**
* Base class for Table views.
@@ -30,8 +32,8 @@ abstract class AbstractTableView {
/** Internal table. */
protected final InternalTable tbl;
- /** Schema registry. */
- protected final SchemaRegistry schemaReg;
+ /** Table row view converter. */
+ protected final TableViewRowConverter rowConverter;
/**
* Constructor.
@@ -41,7 +43,7 @@ abstract class AbstractTableView {
*/
protected AbstractTableView(InternalTable tbl, SchemaRegistry schemaReg) {
this.tbl = tbl;
- this.schemaReg = schemaReg;
+ this.rowConverter = new TableViewRowConverter(schemaReg);
}
/**
@@ -57,26 +59,10 @@ abstract class AbstractTableView {
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupt flag.
- throw convertException(e);
+ throw
sneakyThrow(IgniteExceptionMapperUtil.mapToPublicException(e));
} catch (ExecutionException e) {
- throw convertException(e.getCause());
- } catch (IgniteInternalException e) {
- throw convertException(e);
- }
- }
-
- /**
- * Converts an internal exception to a public one.
- *
- * @param th Internal exception.
- * @return Public exception.
- */
- protected IgniteException convertException(Throwable th) {
- if (th instanceof IgniteException) {
- return (IgniteException) th;
+ Throwable cause = ExceptionUtils.unwrapCause(e);
+ throw sneakyThrow(cause);
}
-
- //TODO: IGNITE-20181 KV/Binary view public API should only throw
public exceptions for the end user.
- return new IgniteException(th);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index 72c89b550c..85f7371093 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import static
org.apache.ignite.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -36,6 +38,7 @@ import
org.apache.ignite.internal.streamer.StreamerBatchSender;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.MarshallerException;
import org.apache.ignite.lang.NullableValue;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.KeyValueView;
@@ -78,7 +81,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView
implements KeyValu
Row keyRow = marshal(key, null);
- return tbl.get(keyRow, (InternalTransaction)
tx).thenApply(this::unmarshalValue);
+ return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction)
tx).thenApply(this::unmarshalValue));
}
/**
@@ -112,7 +115,8 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
public CompletableFuture<Tuple> getOrDefaultAsync(@Nullable Transaction
tx, Tuple key, Tuple defaultValue) {
BinaryRowEx keyRow = marshal(Objects.requireNonNull(key), null);
- return tbl.get(keyRow, (InternalTransaction) tx).thenApply(r ->
IgniteUtils.nonNullOrElse(unmarshalValue(r), defaultValue));
+ return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx)
+ .thenApply(r -> IgniteUtils.nonNullOrElse(unmarshalValue(r),
defaultValue)));
}
/** {@inheritDoc} */
@@ -126,7 +130,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
public CompletableFuture<Map<Tuple, Tuple>> getAllAsync(@Nullable
Transaction tx, Collection<Tuple> keys) {
List<BinaryRowEx> keyRows = marshalKeys(Objects.requireNonNull(keys));
- return tbl.getAll(keyRows, (InternalTransaction)
tx).thenApply(this::unmarshalValue);
+ return convertToPublicFuture(tbl.getAll(keyRows, (InternalTransaction)
tx).thenApply(this::unmarshalValue));
}
/** {@inheritDoc} */
@@ -155,7 +159,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
Row row = marshal(key, val);
- return tbl.upsert(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.upsert(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -169,7 +173,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
public CompletableFuture<Void> putAllAsync(@Nullable Transaction tx,
Map<Tuple, Tuple> pairs) {
Objects.requireNonNull(pairs);
- return tbl.upsertAll(marshalPairs(pairs.entrySet()),
(InternalTransaction) tx);
+ return
convertToPublicFuture(tbl.upsertAll(marshalPairs(pairs.entrySet()),
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -186,7 +190,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
Row row = marshal(key, val);
- return tbl.getAndUpsert(row, (InternalTransaction)
tx).thenApply(this::unmarshalValue);
+ return convertToPublicFuture(tbl.getAndUpsert(row,
(InternalTransaction) tx).thenApply(this::unmarshalValue));
}
/**
@@ -224,7 +228,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
Row row = marshal(key, val);
- return tbl.insert(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.insert(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -246,7 +250,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
Row row = marshal(key, null);
- return tbl.delete(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.delete(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -257,7 +261,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
Row row = marshal(key, val);
- return tbl.deleteExact(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.deleteExact(row,
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -271,8 +275,8 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
public CompletableFuture<Collection<Tuple>> removeAllAsync(@Nullable
Transaction tx, Collection<Tuple> keys) {
List<BinaryRowEx> keyRows = marshalKeys(Objects.requireNonNull(keys));
- return tbl.deleteAll(keyRows, (InternalTransaction) tx)
- .thenApply(this::unmarshalKeys);
+ return convertToPublicFuture(tbl.deleteAll(keyRows,
(InternalTransaction) tx)
+ .thenApply(this::unmarshalKeys));
}
/** {@inheritDoc} */
@@ -288,7 +292,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
public CompletableFuture<Tuple> getAndRemoveAsync(@Nullable Transaction
tx, Tuple key) {
Objects.requireNonNull(key);
- return tbl.getAndDelete(marshal(key, null), (InternalTransaction)
tx).thenApply(this::unmarshalValue);
+ return convertToPublicFuture(tbl.getAndDelete(marshal(key, null),
(InternalTransaction) tx).thenApply(this::unmarshalValue));
}
/**
@@ -331,7 +335,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
Row row = marshal(key, val);
- return tbl.replace(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.replace(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -349,7 +353,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
Row oldRow = marshal(key, oldVal);
Row newRow = marshal(key, newVal);
- return tbl.replace(oldRow, newRow, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.replace(oldRow, newRow,
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -364,7 +368,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return tbl.getAndReplace(marshal(key, val), (InternalTransaction)
tx).thenApply(this::unmarshalValue);
+ return convertToPublicFuture(tbl.getAndReplace(marshal(key, val),
(InternalTransaction) tx).thenApply(this::unmarshalValue));
}
/**
@@ -400,7 +404,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
try {
return marsh.marshal(key, val);
} catch (TupleMarshallerException ex) {
- throw convertException(ex);
+ throw new MarshallerException(ex);
}
}
@@ -415,7 +419,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
return null;
}
- return TableRow.valueTuple(schemaReg.resolve(row));
+ return TableRow.valueTuple(rowConverter.resolveRow(row));
}
/**
@@ -427,7 +431,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
private Map<Tuple, Tuple> unmarshalValue(Collection<BinaryRow> rows) {
Map<Tuple, Tuple> pairs = IgniteUtils.newHashMap(rows.size());
- for (Row row : schemaReg.resolve(rows)) {
+ for (Row row : rowConverter.resolveRows(rows)) {
if (row != null) {
pairs.put(TableRow.keyTuple(row), TableRow.valueTuple(row));
}
@@ -468,7 +472,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
List<Tuple> tuples = new ArrayList<>(rows.size());
- for (Row row : schemaReg.resolveKeys(rows)) {
+ for (Row row : rowConverter.resolveKeys(rows)) {
tuples.add(TableRow.keyTuple(row));
}
@@ -480,9 +484,9 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
public CompletableFuture<Void> streamData(Publisher<Entry<Tuple, Tuple>>
publisher, @Nullable DataStreamerOptions options) {
Objects.requireNonNull(publisher);
- var partitioner = new
KeyValueTupleStreamerPartitionAwarenessProvider(schemaReg, tbl.partitions());
+ var partitioner = new
KeyValueTupleStreamerPartitionAwarenessProvider(rowConverter.registry(),
tbl.partitions());
StreamerBatchSender<Entry<Tuple, Tuple>, Integer> batchSender =
- (partitionId, items) -> tbl.upsertAll(marshalPairs(items),
partitionId);
+ (partitionId, items) ->
convertToPublicFuture(this.tbl.upsertAll(marshalPairs(items), partitionId));
return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValuePojoStreamerPartitionAwarenessProvider.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValuePojoStreamerPartitionAwarenessProvider.java
index 6668cd7783..f34b2e49b9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValuePojoStreamerPartitionAwarenessProvider.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValuePojoStreamerPartitionAwarenessProvider.java
@@ -52,7 +52,7 @@ class KeyValuePojoStreamerPartitionAwarenessProvider<K, V>
extends AbstractClien
return hashCalc.hash();
} catch (MarshallerException e) {
- throw new RuntimeException(e);
+ throw new org.apache.ignite.lang.MarshallerException(e);
}
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 7c29c161dc..167d656370 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import static
org.apache.ignite.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -87,7 +89,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView
implements KeyValu
public CompletableFuture<V> getAsync(@Nullable Transaction tx, K key) {
BinaryRowEx keyRow = marshal(Objects.requireNonNull(key));
- return tbl.get(keyRow, (InternalTransaction)
tx).thenApply(this::unmarshallValue);
+ return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction)
tx).thenApply(this::unmarshallValue));
}
/** {@inheritDoc} */
@@ -101,7 +103,8 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<NullableValue<V>> getNullableAsync(@Nullable
Transaction tx, K key) {
BinaryRowEx keyRow = marshal(Objects.requireNonNull(key));
- return tbl.get(keyRow, (InternalTransaction) tx).thenApply(r -> r ==
null ? null : NullableValue.of(unmarshalNullableValue(r)));
+ return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx)
+ .thenApply(r -> r == null ? null :
NullableValue.of(unmarshalNullableValue(r))));
}
/** {@inheritDoc} */
@@ -115,7 +118,8 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<V> getOrDefaultAsync(@Nullable Transaction tx, K
key, V defaultValue) {
BinaryRowEx keyRow = marshal(Objects.requireNonNull(key));
- return tbl.get(keyRow, (InternalTransaction) tx).thenApply(r ->
IgniteUtils.nonNullOrElse(unmarshalNullableValue(r), defaultValue));
+ return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx)
+ .thenApply(r ->
IgniteUtils.nonNullOrElse(unmarshalNullableValue(r), defaultValue)));
}
/** {@inheritDoc} */
@@ -129,7 +133,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<Map<K, V>> getAllAsync(@Nullable Transaction tx,
Collection<K> keys) {
Collection<BinaryRowEx> rows = marshal(Objects.requireNonNull(keys));
- return tbl.getAll(rows, (InternalTransaction)
tx).thenApply(this::unmarshalPairs);
+ return convertToPublicFuture(tbl.getAll(rows, (InternalTransaction)
tx).thenApply(this::unmarshalPairs));
}
/** {@inheritDoc} */
@@ -143,7 +147,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx,
K key) {
BinaryRowEx keyRow = marshal(Objects.requireNonNull(key));
- return tbl.get(keyRow, (InternalTransaction)
tx).thenApply(Objects::nonNull);
+ return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction)
tx).thenApply(Objects::nonNull));
}
/** {@inheritDoc} */
@@ -157,7 +161,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<Void> putAsync(@Nullable Transaction tx, K key, V
val) {
BinaryRowEx row = marshal(Objects.requireNonNull(key), val);
- return tbl.upsert(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.upsert(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -171,7 +175,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<Void> putAllAsync(@Nullable Transaction tx,
Map<K, V> pairs) {
Collection<BinaryRowEx> rows =
marshalPairs(Objects.requireNonNull(pairs).entrySet());
- return tbl.upsertAll(rows, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.upsertAll(rows, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -186,7 +190,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return tbl.getAndUpsert(marshal(key, val), (InternalTransaction)
tx).thenApply(this::unmarshallValue);
+ return convertToPublicFuture(tbl.getAndUpsert(marshal(key, val),
(InternalTransaction) tx).thenApply(this::unmarshallValue));
}
/** {@inheritDoc} */
@@ -200,8 +204,8 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<NullableValue<V>>
getNullableAndPutAsync(@Nullable Transaction tx, K key, V val) {
BinaryRowEx row = marshal(Objects.requireNonNull(key), val);
- return tbl.getAndUpsert(row, (InternalTransaction) tx)
- .thenApply(r -> r == null ? null :
NullableValue.of(unmarshalNullableValue(r)));
+ return convertToPublicFuture(tbl.getAndUpsert(row,
(InternalTransaction) tx)
+ .thenApply(r -> r == null ? null :
NullableValue.of(unmarshalNullableValue(r))));
}
/** {@inheritDoc} */
@@ -215,7 +219,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<Boolean> putIfAbsentAsync(@Nullable Transaction
tx, K key, V val) {
BinaryRowEx row = marshal(Objects.requireNonNull(key), val);
- return tbl.insert(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.insert(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -235,7 +239,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K
key) {
BinaryRowEx row = marshal(Objects.requireNonNull(key));
- return tbl.delete(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.delete(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -243,7 +247,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K
key, V val) {
BinaryRowEx row = marshal(Objects.requireNonNull(key), val);
- return tbl.deleteExact(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.deleteExact(row,
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -257,7 +261,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<Collection<K>> removeAllAsync(@Nullable
Transaction tx, Collection<K> keys) {
Collection<BinaryRowEx> rows = marshal(Objects.requireNonNull(keys));
- return tbl.deleteAll(rows, (InternalTransaction)
tx).thenApply(this::unmarshalKeys);
+ return convertToPublicFuture(tbl.deleteAll(rows, (InternalTransaction)
tx).thenApply(this::unmarshalKeys));
}
/** {@inheritDoc} */
@@ -271,7 +275,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<V> getAndRemoveAsync(@Nullable Transaction tx, K
key) {
BinaryRowEx keyRow = marshal(Objects.requireNonNull(key));
- return tbl.getAndDelete(keyRow, (InternalTransaction)
tx).thenApply(this::unmarshallValue);
+ return convertToPublicFuture(tbl.getAndDelete(keyRow,
(InternalTransaction) tx).thenApply(this::unmarshallValue));
}
/** {@inheritDoc} */
@@ -285,8 +289,8 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<NullableValue<V>>
getNullableAndRemoveAsync(@Nullable Transaction tx, K key) {
BinaryRowEx keyRow = marshal(Objects.requireNonNull(key));
- return tbl.getAndDelete(keyRow, (InternalTransaction) tx)
- .thenApply(r -> r == null ? null :
NullableValue.of(unmarshalNullableValue(r)));
+ return convertToPublicFuture(tbl.getAndDelete(keyRow,
(InternalTransaction) tx)
+ .thenApply(r -> r == null ? null :
NullableValue.of(unmarshalNullableValue(r))));
}
/** {@inheritDoc} */
@@ -306,7 +310,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, K
key, V val) {
BinaryRowEx row = marshal(Objects.requireNonNull(key), val);
- return tbl.replace(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.replace(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -317,7 +321,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
BinaryRowEx oldRow = marshal(key, oldVal);
BinaryRowEx newRow = marshal(key, newVal);
- return tbl.replace(oldRow, newRow, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.replace(oldRow, newRow,
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -332,7 +336,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return tbl.getAndReplace(marshal(key, val), (InternalTransaction)
tx).thenApply(this::unmarshallValue);
+ return convertToPublicFuture(tbl.getAndReplace(marshal(key, val),
(InternalTransaction) tx).thenApply(this::unmarshallValue));
}
/** {@inheritDoc} */
@@ -346,8 +350,8 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<NullableValue<V>>
getNullableAndReplaceAsync(@Nullable Transaction tx, K key, V val) {
BinaryRowEx row = marshal(Objects.requireNonNull(key), val);
- return tbl.getAndReplace(row, (InternalTransaction) tx)
- .thenApply(r -> r == null ? null :
NullableValue.of(unmarshalNullableValue(r)));
+ return convertToPublicFuture(tbl.getAndReplace(row,
(InternalTransaction) tx)
+ .thenApply(r -> r == null ? null :
NullableValue.of(unmarshalNullableValue(r))));
}
/**
@@ -364,7 +368,8 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
// TODO: Cache marshaller for schema version or upgrade row?
- return this.marsh =
marshallerFactory.apply(schemaReg.schema(schemaVersion));
+ SchemaRegistry registry = rowConverter.registry();
+ return this.marsh =
marshallerFactory.apply(registry.schema(schemaVersion));
}
/**
@@ -373,7 +378,8 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
* @return Marshaller.
*/
private KvMarshaller<K, V> marshaller() {
- return marshaller(schemaReg.lastSchemaVersion());
+ SchemaRegistry registry = rowConverter.registry();
+ return marshaller(registry.lastSchemaVersion());
}
/**
@@ -477,7 +483,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
List<K> keys = new ArrayList<>(rows.size());
try {
- for (Row row : schemaReg.resolveKeys(rows)) {
+ for (Row row : rowConverter.resolveKeys(rows)) {
if (row != null) {
keys.add(marsh.unmarshalKey(row));
}
@@ -500,7 +506,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
return null;
}
- Row row = schemaReg.resolve(binaryRow);
+ Row row = rowConverter.resolveRow(binaryRow);
KvMarshaller<K, V> marshaller = marshaller(row.schemaVersion());
@@ -527,7 +533,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
Map<K, V> pairs = IgniteUtils.newHashMap(rows.size());
try {
- for (Row row : schemaReg.resolve(rows)) {
+ for (Row row : rowConverter.resolveRows(rows)) {
if (row != null) {
pairs.put(marsh.unmarshalKey(row),
marsh.unmarshalValue(row));
}
@@ -555,7 +561,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
V v = unmarshalNullableValue(binaryRow);
if (v == null) {
- throw new UnexpectedNullValueException("use `getNullable` sibling
method instead.");
+ throw new UnexpectedNullValueException("Got unexpected null value:
use `getNullable` sibling method instead.");
}
return v;
@@ -566,8 +572,9 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
public CompletableFuture<Void> streamData(Publisher<Entry<K, V>>
publisher, @Nullable DataStreamerOptions options) {
Objects.requireNonNull(publisher);
- var partitioner = new
KeyValuePojoStreamerPartitionAwarenessProvider<>(schemaReg, tbl.partitions(),
marshaller());
- StreamerBatchSender<Entry<K, V>, Integer> batchSender = (partitionId,
items) -> tbl.upsertAll(marshalPairs(items), partitionId);
+ var partitioner = new
KeyValuePojoStreamerPartitionAwarenessProvider<>(rowConverter.registry(),
tbl.partitions(), marshaller());
+ StreamerBatchSender<Entry<K, V>, Integer> batchSender = (partitionId,
items)
+ ->
convertToPublicFuture(this.tbl.upsertAll(marshalPairs(items), partitionId));
return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 5414446c11..327bb45839 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import static
org.apache.ignite.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -33,6 +35,7 @@ import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.MarshallerException;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
@@ -71,7 +74,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
final Row keyRow = marshal(keyRec, true); // Convert to portable
format to pass TX/storage layer.
- return tbl.get(keyRow, (InternalTransaction) tx).thenApply(this::wrap);
+ return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction)
tx).thenApply(this::wrap));
}
@Override
@@ -83,7 +86,8 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
public CompletableFuture<List<Tuple>> getAllAsync(@Nullable Transaction
tx, Collection<Tuple> keyRecs) {
Objects.requireNonNull(keyRecs);
- return tbl.getAll(mapToBinary(keyRecs, true), (InternalTransaction)
tx).thenApply(binaryRows -> wrap(binaryRows, true));
+ return convertToPublicFuture(tbl.getAll(mapToBinary(keyRecs, true),
(InternalTransaction) tx)
+ .thenApply(binaryRows -> wrap(binaryRows, true)));
}
/** {@inheritDoc} */
@@ -99,7 +103,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
final Row row = marshal(rec, false);
- return tbl.upsert(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.upsert(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -113,7 +117,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx,
Collection<Tuple> recs) {
Objects.requireNonNull(recs);
- return tbl.upsertAll(mapToBinary(recs, false), (InternalTransaction)
tx);
+ return convertToPublicFuture(tbl.upsertAll(mapToBinary(recs, false),
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -129,7 +133,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
final Row row = marshal(rec, false);
- return tbl.getAndUpsert(row, (InternalTransaction)
tx).thenApply(this::wrap);
+ return convertToPublicFuture(tbl.getAndUpsert(row,
(InternalTransaction) tx).thenApply(this::wrap));
}
/** {@inheritDoc} */
@@ -145,7 +149,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
final Row row = marshal(rec, false);
- return tbl.insert(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.insert(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -159,7 +163,8 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
public CompletableFuture<Collection<Tuple>> insertAllAsync(@Nullable
Transaction tx, Collection<Tuple> recs) {
Objects.requireNonNull(recs);
- return tbl.insertAll(mapToBinary(recs, false), (InternalTransaction)
tx).thenApply(rows -> wrap(rows, false));
+ return convertToPublicFuture(tbl.insertAll(mapToBinary(recs, false),
(InternalTransaction) tx)
+ .thenApply(rows -> wrap(rows, false)));
}
/** {@inheritDoc} */
@@ -181,7 +186,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
final Row row = marshal(rec, false);
- return tbl.replace(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.replace(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -193,7 +198,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
final Row oldRow = marshal(oldRec, false);
final Row newRow = marshal(newRec, false);
- return tbl.replace(oldRow, newRow, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.replace(oldRow, newRow,
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -209,7 +214,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
final Row row = marshal(rec, false);
- return tbl.getAndReplace(row, (InternalTransaction)
tx).thenApply(this::wrap);
+ return convertToPublicFuture(tbl.getAndReplace(row,
(InternalTransaction) tx).thenApply(this::wrap));
}
/** {@inheritDoc} */
@@ -225,7 +230,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
final Row keyRow = marshal(keyRec, true);
- return tbl.delete(keyRow, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.delete(keyRow, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -241,7 +246,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
final Row row = marshal(rec, false);
- return tbl.deleteExact(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.deleteExact(row,
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -257,7 +262,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
final Row keyRow = marshal(keyRec, true);
- return tbl.getAndDelete(keyRow, (InternalTransaction)
tx).thenApply(this::wrap);
+ return convertToPublicFuture(tbl.getAndDelete(keyRow,
(InternalTransaction) tx).thenApply(this::wrap));
}
/** {@inheritDoc} */
@@ -271,7 +276,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
public CompletableFuture<Collection<Tuple>> deleteAllAsync(@Nullable
Transaction tx, Collection<Tuple> keyRecs) {
Objects.requireNonNull(keyRecs);
- return tbl.deleteAll(mapToBinary(keyRecs, true), (InternalTransaction)
tx).thenApply(this::wrapKeys);
+ return convertToPublicFuture(tbl.deleteAll(mapToBinary(keyRecs, true),
(InternalTransaction) tx).thenApply(this::wrapKeys));
}
/** {@inheritDoc} */
@@ -285,7 +290,8 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
public CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@Nullable
Transaction tx, Collection<Tuple> recs) {
Objects.requireNonNull(recs);
- return tbl.deleteAllExact(mapToBinary(recs, false),
(InternalTransaction) tx).thenApply(rows -> wrap(rows, false));
+ return convertToPublicFuture(tbl.deleteAllExact(mapToBinary(recs,
false), (InternalTransaction) tx)
+ .thenApply(rows -> wrap(rows, false)));
}
/**
@@ -304,7 +310,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
return marsh.marshal(tuple);
}
} catch (TupleMarshallerException ex) {
- throw convertException(ex);
+ throw new MarshallerException(ex);
}
}
@@ -314,7 +320,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
* @param row Binary row.
*/
private @Nullable Tuple wrap(@Nullable BinaryRow row) {
- return row == null ? null : TableRow.tuple(schemaReg.resolve(row));
+ return row == null ? null :
TableRow.tuple(rowConverter.resolveRow(row));
}
/**
@@ -330,7 +336,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
var wrapped = new ArrayList<Tuple>(rows.size());
- for (Row row : schemaReg.resolve(rows)) {
+ for (Row row : rowConverter.resolveRows(rows)) {
if (row != null) {
wrapped.add(TableRow.tuple(row));
} else if (addNull) {
@@ -348,7 +354,7 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
var wrapped = new ArrayList<Tuple>(rows.size());
- for (Row row : schemaReg.resolveKeys(rows)) {
+ for (Row row : rowConverter.resolveKeys(rows)) {
if (row != null) {
wrapped.add(TableRow.tuple(row));
}
@@ -379,8 +385,9 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
public CompletableFuture<Void> streamData(Publisher<Tuple> publisher,
@Nullable DataStreamerOptions options) {
Objects.requireNonNull(publisher);
- var partitioner = new
TupleStreamerPartitionAwarenessProvider(schemaReg, tbl.partitions());
- StreamerBatchSender<Tuple, Integer> batchSender = (partitionId, items)
-> tbl.upsertAll(mapToBinary(items, false), partitionId);
+ var partitioner = new
TupleStreamerPartitionAwarenessProvider(rowConverter.registry(),
tbl.partitions());
+ StreamerBatchSender<Tuple, Integer> batchSender = (partitionId, items)
->
+ convertToPublicFuture(this.tbl.upsertAll(mapToBinary(items,
false), partitionId));
return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index eeebd2e02e..5229eaa2c4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import static
org.apache.ignite.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -35,7 +37,6 @@ import
org.apache.ignite.internal.schema.marshaller.reflection.RecordMarshallerI
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.mapper.Mapper;
@@ -76,7 +77,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<R> getAsync(@Nullable Transaction tx, R keyRec) {
BinaryRowEx keyRow = marshalKey(Objects.requireNonNull(keyRec));
- return tbl.get(keyRow, (InternalTransaction)
tx).thenApply(this::unmarshal);
+ return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction)
tx).thenApply(this::unmarshal));
}
@Override
@@ -88,7 +89,8 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<List<R>> getAllAsync(@Nullable Transaction tx,
Collection<R> keyRecs) {
Objects.requireNonNull(keyRecs);
- return tbl.getAll(marshalKeys(keyRecs), (InternalTransaction)
tx).thenApply(binaryRows -> unmarshal(binaryRows, true));
+ return convertToPublicFuture(tbl.getAll(marshalKeys(keyRecs),
(InternalTransaction) tx)
+ .thenApply(binaryRows -> unmarshal(binaryRows, true)));
}
/** {@inheritDoc} */
@@ -102,7 +104,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<Void> upsertAsync(@Nullable Transaction tx, R
rec) {
BinaryRowEx keyRow = marshal(Objects.requireNonNull(rec));
- return tbl.upsert(keyRow, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.upsert(keyRow, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -116,7 +118,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx,
Collection<R> recs) {
Objects.requireNonNull(recs);
- return tbl.upsertAll(marshal(recs), (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.upsertAll(marshal(recs),
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -130,7 +132,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<R> getAndUpsertAsync(@Nullable Transaction tx, R
rec) {
BinaryRowEx keyRow = marshal(Objects.requireNonNull(rec));
- return tbl.getAndUpsert(keyRow, (InternalTransaction)
tx).thenApply(this::unmarshal);
+ return convertToPublicFuture(tbl.getAndUpsert(keyRow,
(InternalTransaction) tx).thenApply(this::unmarshal));
}
/** {@inheritDoc} */
@@ -144,7 +146,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<Boolean> insertAsync(@Nullable Transaction tx, R
rec) {
BinaryRowEx keyRow = marshal(Objects.requireNonNull(rec));
- return tbl.insert(keyRow, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.insert(keyRow, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -158,7 +160,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<Collection<R>> insertAllAsync(@Nullable
Transaction tx, Collection<R> recs) {
Collection<BinaryRowEx> rows = marshal(Objects.requireNonNull(recs));
- return tbl.insertAll(rows, (InternalTransaction)
tx).thenApply(binaryRows -> unmarshal(binaryRows, false));
+ return convertToPublicFuture(tbl.insertAll(rows, (InternalTransaction)
tx).thenApply(binaryRows -> unmarshal(binaryRows, false)));
}
/** {@inheritDoc} */
@@ -178,7 +180,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, R
rec) {
BinaryRowEx newRow = marshal(Objects.requireNonNull(rec));
- return tbl.replace(newRow, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.replace(newRow, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -187,7 +189,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
BinaryRowEx oldRow = marshal(Objects.requireNonNull(oldRec));
BinaryRowEx newRow = marshal(Objects.requireNonNull(newRec));
- return tbl.replace(oldRow, newRow, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.replace(oldRow, newRow,
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -201,7 +203,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<R> getAndReplaceAsync(@Nullable Transaction tx, R
rec) {
BinaryRowEx row = marshal(Objects.requireNonNull(rec));
- return tbl.getAndReplace(row, (InternalTransaction)
tx).thenApply(this::unmarshal);
+ return convertToPublicFuture(tbl.getAndReplace(row,
(InternalTransaction) tx).thenApply(this::unmarshal));
}
/** {@inheritDoc} */
@@ -215,7 +217,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<Boolean> deleteAsync(@Nullable Transaction tx, R
keyRec) {
BinaryRowEx row = marshalKey(Objects.requireNonNull(keyRec));
- return tbl.delete(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.delete(row, (InternalTransaction)
tx));
}
/** {@inheritDoc} */
@@ -229,7 +231,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<Boolean> deleteExactAsync(@Nullable Transaction
tx, R keyRec) {
BinaryRowEx row = marshal(Objects.requireNonNull(keyRec));
- return tbl.deleteExact(row, (InternalTransaction) tx);
+ return convertToPublicFuture(tbl.deleteExact(row,
(InternalTransaction) tx));
}
/** {@inheritDoc} */
@@ -243,7 +245,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<R> getAndDeleteAsync(@Nullable Transaction tx, R
keyRec) {
BinaryRowEx row = marshalKey(keyRec);
- return tbl.getAndDelete(row, (InternalTransaction)
tx).thenApply(this::unmarshal);
+ return convertToPublicFuture(tbl.getAndDelete(row,
(InternalTransaction) tx).thenApply(this::unmarshal));
}
/** {@inheritDoc} */
@@ -257,7 +259,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<Collection<R>> deleteAllAsync(@Nullable
Transaction tx, Collection<R> keyRecs) {
Collection<BinaryRowEx> rows =
marshal(Objects.requireNonNull(keyRecs));
- return tbl.deleteAll(rows, (InternalTransaction)
tx).thenApply(binaryRows -> unmarshal(binaryRows, false));
+ return convertToPublicFuture(tbl.deleteAll(rows, (InternalTransaction)
tx).thenApply(binaryRows -> unmarshal(binaryRows, false)));
}
/** {@inheritDoc} */
@@ -271,7 +273,8 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<Collection<R>> deleteAllExactAsync(@Nullable
Transaction tx, Collection<R> keyRecs) {
Collection<BinaryRowEx> rows =
marshal(Objects.requireNonNull(keyRecs));
- return tbl.deleteAllExact(rows, (InternalTransaction)
tx).thenApply(binaryRows -> unmarshal(binaryRows, false));
+ return convertToPublicFuture(tbl.deleteAllExact(rows,
(InternalTransaction) tx)
+ .thenApply(binaryRows -> unmarshal(binaryRows, false)));
}
/**
@@ -289,7 +292,8 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
// TODO: Cache marshaller for schema version or upgrade row?
- return this.marsh =
marshallerFactory.apply(schemaReg.schema(schemaVersion));
+ SchemaDescriptor schema =
rowConverter.registry().schema(schemaVersion);
+ return this.marsh = marshallerFactory.apply(schema);
}
/**
@@ -298,6 +302,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
* @return Marshaller.
*/
private RecordMarshaller<R> marshaller() {
+ SchemaRegistry schemaReg = rowConverter.registry();
return marshaller(schemaReg.lastSchemaVersion());
}
@@ -313,7 +318,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
try {
return marsh.marshal(rec);
} catch (MarshallerException e) {
- throw new IgniteException(e);
+ throw new org.apache.ignite.lang.MarshallerException(e);
}
}
@@ -337,7 +342,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
return rows;
} catch (MarshallerException e) {
- throw new IgniteException(e);
+ throw new org.apache.ignite.lang.MarshallerException(e);
}
}
@@ -353,7 +358,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
try {
return marsh.marshalKey(rec);
} catch (MarshallerException e) {
- throw new IgniteException(e);
+ throw new org.apache.ignite.lang.MarshallerException(e);
}
}
@@ -377,7 +382,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
return rows;
} catch (MarshallerException e) {
- throw new IgniteException(e);
+ throw new org.apache.ignite.lang.MarshallerException(e);
}
}
@@ -392,14 +397,14 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
return null;
}
- Row row = schemaReg.resolve(binaryRow);
+ Row row = rowConverter.resolveRow(binaryRow);
RecordMarshaller<R> marshaller = marshaller(row.schemaVersion());
try {
return marshaller.unmarshal(row);
} catch (MarshallerException e) {
- throw new IgniteException(e);
+ throw new org.apache.ignite.lang.MarshallerException(e);
}
}
@@ -420,7 +425,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
var recs = new ArrayList<R>(rows.size());
try {
- for (Row row : schemaReg.resolve(rows)) {
+ for (Row row : rowConverter.resolveRows(rows)) {
if (row != null) {
recs.add(marsh.unmarshal(row));
} else if (addNull) {
@@ -430,7 +435,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
return recs;
} catch (MarshallerException e) {
- throw new IgniteException(e);
+ throw new org.apache.ignite.lang.MarshallerException(e);
}
}
@@ -439,8 +444,8 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
public CompletableFuture<Void> streamData(Publisher<R> publisher,
@Nullable DataStreamerOptions options) {
Objects.requireNonNull(publisher);
- var partitioner = new
PojoStreamerPartitionAwarenessProvider<>(schemaReg, tbl.partitions(),
marshaller());
- StreamerBatchSender<R, Integer> batchSender = (partitionId, items) ->
tbl.upsertAll(marshal(items), partitionId);
+ var partitioner = new
PojoStreamerPartitionAwarenessProvider<>(rowConverter.registry(),
tbl.partitions(), marshaller());
+ StreamerBatchSender<R, Integer> batchSender = (partitionId, items) ->
this.tbl.upsertAll(marshal(items), partitionId);
return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewRowConverter.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewRowConverter.java
new file mode 100644
index 0000000000..a81aabdfe0
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewRowConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.lang.MarshallerException;
+
+/**
+ * Converts {@link BinaryRow binary rows} to {@link Row rows} using {@link
SchemaRegistry}.
+ */
+final class TableViewRowConverter {
+
+ private final SchemaRegistry schemaReg;
+
+ TableViewRowConverter(SchemaRegistry schemaReg) {
+ this.schemaReg = schemaReg;
+ }
+
+ SchemaRegistry registry() {
+ return schemaReg;
+ }
+
+ Row resolveRow(BinaryRow binaryRow) {
+ try {
+ return schemaReg.resolve(binaryRow);
+ } catch (SchemaRegistryException e) {
+ throw new MarshallerException(e);
+ }
+ }
+
+ List<Row> resolveKeys(Collection<BinaryRow> rows) {
+ try {
+ return schemaReg.resolveKeys(rows);
+ } catch (SchemaRegistryException e) {
+ throw new MarshallerException(e);
+ }
+ }
+
+ List<Row> resolveRows(Collection<BinaryRow> rows) {
+ try {
+ return schemaReg.resolve(rows);
+ } catch (SchemaRegistryException e) {
+ throw new MarshallerException(e);
+ }
+ }
+}