This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 764d8b0bc8 [Improve][API] Make sure the table name in TablePath not be
null (#7252)
764d8b0bc8 is described below
commit 764d8b0bc8a12100bf2ef07c9b1488ba2bda70ab
Author: Jia Fan <[email protected]>
AuthorDate: Wed Aug 7 12:46:30 2024 +0800
[Improve][API] Make sure the table name in TablePath not be null (#7252)
---
.../seatunnel/api/sink/DefaultSaveModeHandler.java | 7 +--
.../api/table/catalog/TableIdentifier.java | 18 ++++++--
.../seatunnel/api/table/catalog/TablePath.java | 16 +++++--
.../seatunnel/api/sink/TablePlaceholderTest.java | 10 ++--
.../deserialize/GoogleSheetsDeserializerTest.java | 2 +-
.../seatunnel/http/source/HttpSource.java | 3 +-
.../pulsar/source/PulsarCanalDecoratorTest.java | 2 +-
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 4 +-
.../server/task/SeaTunnelSourceCollector.java | 16 ++-----
.../engine/server/task/flow/SinkFlowLifeCycle.java | 14 ++----
.../format/avro/AvroSerializationSchemaTest.java | 2 +-
.../format/json/JsonRowDataSerDeSchemaTest.java | 18 ++++----
.../json/canal/CanalJsonSerDeSchemaTest.java | 54 +++++++++++-----------
.../json/debezium/DebeziumJsonSerDeSchemaTest.java | 42 ++++++++---------
.../json/maxwell/MaxWellJsonSerDeSchemaTest.java | 2 +-
.../format/json/ogg/OggJsonSerDeSchemaTest.java | 42 ++++++++---------
16 files changed, 129 insertions(+), 123 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index bbbe99281b..e22dd7c99a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -153,19 +153,16 @@ public class DefaultSaveModeHandler implements
SaveModeHandler {
protected void createTable() {
if (!catalog.databaseExists(tablePath.getDatabaseName())) {
- TablePath databasePath = TablePath.of(tablePath.getDatabaseName(),
"");
try {
log.info(
"Creating database {} with action {}",
tablePath.getDatabaseName(),
catalog.previewAction(
- Catalog.ActionType.CREATE_DATABASE,
- databasePath,
- Optional.empty()));
+ Catalog.ActionType.CREATE_DATABASE, tablePath,
Optional.empty()));
} catch (UnsupportedOperationException ignore) {
log.info("Creating database {}", tablePath.getDatabaseName());
}
- catalog.createDatabase(databasePath, true);
+ catalog.createDatabase(tablePath, true);
}
try {
log.info(
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
index 2d39f9b984..101081255c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -17,15 +17,16 @@
package org.apache.seatunnel.api.table.catalog;
+import org.apache.commons.lang3.StringUtils;
+
import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.NonNull;
import java.io.Serializable;
@Getter
@EqualsAndHashCode
-@RequiredArgsConstructor
public final class TableIdentifier implements Serializable {
private static final long serialVersionUID = 1L;
@@ -35,7 +36,18 @@ public final class TableIdentifier implements Serializable {
private final String schemaName;
- private final String tableName;
+ @NonNull private final String tableName;
+
+ public TableIdentifier(
+ String catalogName, String databaseName, String schemaName,
@NonNull String tableName) {
+ this.catalogName = catalogName;
+ this.databaseName = databaseName;
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ if (StringUtils.isEmpty(tableName)) {
+ throw new IllegalArgumentException("tableName cannot be empty");
+ }
+ }
public static TableIdentifier of(String catalogName, String databaseName,
String tableName) {
return new TableIdentifier(catalogName, databaseName, null, tableName);
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
index 1257262187..30edc7ac80 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
@@ -17,9 +17,11 @@
package org.apache.seatunnel.api.table.catalog;
+import org.apache.commons.lang3.StringUtils;
+
import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.NonNull;
import java.io.Serializable;
import java.util.ArrayList;
@@ -27,12 +29,20 @@ import java.util.List;
@Getter
@EqualsAndHashCode
-@RequiredArgsConstructor
public final class TablePath implements Serializable {
private static final long serialVersionUID = 1L;
private final String databaseName;
private final String schemaName;
- private final String tableName;
+ @NonNull private final String tableName;
+
+ public TablePath(String databaseName, String schemaName, @NonNull String
tableName) {
+ this.databaseName = databaseName;
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ if (StringUtils.isEmpty(tableName)) {
+ throw new IllegalArgumentException("tableName cannot be empty");
+ }
+ }
public static final TablePath DEFAULT = TablePath.of("default", "default",
"default");
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java
index 1a87a53f97..16a69d5db3 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java
@@ -77,7 +77,7 @@ public class TablePlaceholderTest {
@Test
public void testSinkOptionsWithNoTablePath() {
ReadonlyConfig config = createConfig();
- CatalogTable table = createTestTableWithNoTablePath();
+ CatalogTable table = createTestTableWithNoDatabaseAndSchemaName();
ReadonlyConfig newConfig =
TablePlaceholder.replaceTablePlaceholder(config, table);
Assertions.assertEquals("xyz_default_db_test",
newConfig.get(DATABASE));
@@ -95,7 +95,7 @@ public class TablePlaceholderTest {
@Test
public void testSinkOptionsWithExcludeKeys() {
ReadonlyConfig config = createConfig();
- CatalogTable table = createTestTableWithNoTablePath();
+ CatalogTable table = createTestTableWithNoDatabaseAndSchemaName();
ReadonlyConfig newConfig =
TablePlaceholder.replaceTablePlaceholder(
config, table, Arrays.asList(DATABASE.key()));
@@ -116,7 +116,7 @@ public class TablePlaceholderTest {
public void testSinkOptionsWithMultiTable() {
ReadonlyConfig config = createConfig();
CatalogTable table1 = createTestTable();
- CatalogTable table2 = createTestTableWithNoTablePath();
+ CatalogTable table2 = createTestTableWithNoDatabaseAndSchemaName();
ReadonlyConfig newConfig1 =
TablePlaceholder.replaceTablePlaceholder(config, table1,
Arrays.asList());
ReadonlyConfig newConfig2 =
@@ -159,8 +159,8 @@ public class TablePlaceholderTest {
return ReadonlyConfig.fromMap(configMap);
}
- private static CatalogTable createTestTableWithNoTablePath() {
- TableIdentifier tableId = TableIdentifier.of("my-catalog", null, null,
null);
+ private static CatalogTable createTestTableWithNoDatabaseAndSchemaName() {
+ TableIdentifier tableId = TableIdentifier.of("my-catalog", null, null,
"default_table");
TableSchema tableSchema =
TableSchema.builder()
.primaryKey(PrimaryKey.of("my-pk", Arrays.asList("f1",
"f2")))
diff --git
a/seatunnel-connectors-v2/connector-google-sheets/src/test/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializerTest.java
b/seatunnel-connectors-v2/connector-google-sheets/src/test/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializerTest.java
index c55228471c..e2e3139d88 100644
---
a/seatunnel-connectors-v2/connector-google-sheets/src/test/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializerTest.java
+++
b/seatunnel-connectors-v2/connector-google-sheets/src/test/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializerTest.java
@@ -42,7 +42,7 @@ public class GoogleSheetsDeserializerTest {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
- CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "", schema);
+ CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "test", schema);
final DeserializationSchema<SeaTunnelRow> deser =
new JsonDeserializationSchema(catalogTables, false, false);
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 754a7b9366..c41e8a9a84 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -146,7 +147,7 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
} else {
TableIdentifier tableIdentifier =
- TableIdentifier.of(HttpConfig.CONNECTOR_IDENTITY, null,
null);
+ TableIdentifier.of(HttpConfig.CONNECTOR_IDENTITY,
TablePath.DEFAULT);
TableSchema tableSchema =
TableSchema.builder()
.column(
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarCanalDecoratorTest.java
b/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarCanalDecoratorTest.java
index 7b1ee39fd4..ee5e1513fb 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarCanalDecoratorTest.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarCanalDecoratorTest.java
@@ -58,7 +58,7 @@ public class PulsarCanalDecoratorTest {
SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames,
dataTypes);
CatalogTable catalogTables =
- CatalogTableUtil.getCatalogTable("", "", "", "",
seaTunnelRowType);
+ CatalogTableUtil.getCatalogTable("", "", "", "test",
seaTunnelRowType);
CanalJsonDeserializationSchema canalJsonDeserializationSchema =
CanalJsonDeserializationSchema.builder(catalogTables).build();
PulsarCanalDecorator pulsarCanalDecorator =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 0d9f5d5ef8..6e67aa021d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -412,7 +412,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
};
SeaTunnelRowType fake_source_row_type = new
SeaTunnelRowType(fieldNames, fieldTypes);
CatalogTable catalogTable =
- CatalogTableUtil.getCatalogTable("", "", "", "",
fake_source_row_type);
+ CatalogTableUtil.getCatalogTable("", "", "", "test",
fake_source_row_type);
AvroDeserializationSchema avroDeserializationSchema =
new AvroDeserializationSchema(catalogTable);
List<SeaTunnelRow> kafkaSTRow =
@@ -464,7 +464,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
CatalogTable catalogTable =
- CatalogTableUtil.getCatalogTable("", "", "", "",
SEATUNNEL_ROW_TYPE);
+ CatalogTableUtil.getCatalogTable("", "", "", "test",
SEATUNNEL_ROW_TYPE);
AvroDeserializationSchema avroDeserializationSchema =
new AvroDeserializationSchema(catalogTable);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 62612d0617..e1b2494789 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -102,9 +102,11 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
tablePaths.forEach(
tablePath ->
sourceReceivedCountPerTable.put(
- getFullName(tablePath),
+ tablePath.getFullName(),
metricsContext.counter(
- SOURCE_RECEIVED_COUNT + "#" +
getFullName(tablePath))));
+ SOURCE_RECEIVED_COUNT
+ + "#"
+ +
tablePath.getFullName())));
}
sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
@@ -131,7 +133,7 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
sourceReceivedBytesPerSeconds.markEvent(size);
flowControlGate.audit((SeaTunnelRow) row);
if (StringUtils.isNotEmpty(tableId)) {
- String tableName = getFullName(TablePath.of(tableId));
+ String tableName = TablePath.of(tableId).getFullName();
Counter sourceTableCounter =
sourceReceivedCountPerTable.get(tableName);
if (Objects.nonNull(sourceTableCounter)) {
sourceTableCounter.inc();
@@ -232,12 +234,4 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
}
}
}
-
- private String getFullName(TablePath tablePath) {
- if (StringUtils.isBlank(tablePath.getTableName())) {
- tablePath =
- TablePath.of(tablePath.getDatabaseName(),
tablePath.getSchemaName(), "default");
- }
- return tablePath.getFullName();
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 516e1c97c4..de8257f1e9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -138,9 +138,9 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
sinkTables.forEach(
tablePath ->
sinkWriteCountPerTable.put(
- getFullName(tablePath),
+ tablePath.getFullName(),
metricsContext.counter(
- SINK_WRITE_COUNT + "#" +
getFullName(tablePath))));
+ SINK_WRITE_COUNT + "#" +
tablePath.getFullName())));
}
}
@@ -275,7 +275,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
sinkWriteBytesPerSeconds.markEvent(size);
String tableId = ((SeaTunnelRow)
record.getData()).getTableId();
if (StringUtils.isNotBlank(tableId)) {
- String tableName = getFullName(TablePath.of(tableId));
+ String tableName = TablePath.of(tableId).getFullName();
Counter sinkTableCounter =
sinkWriteCountPerTable.get(tableName);
if (Objects.nonNull(sinkTableCounter)) {
sinkTableCounter.inc();
@@ -345,12 +345,4 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
((SupportResourceShare)
this.writer).setMultiTableResourceManager(resourceManager, 0);
}
}
-
- private String getFullName(TablePath tablePath) {
- if (StringUtils.isBlank(tablePath.getTableName())) {
- tablePath =
- TablePath.of(tablePath.getDatabaseName(),
tablePath.getSchemaName(), "default");
- }
- return tablePath.getFullName();
- }
}
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
index 1e1554be71..42b8029f16 100644
---
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
@@ -160,7 +160,7 @@ class AvroSerializationSchemaTest {
@Test
public void testSerialization() throws IOException {
SeaTunnelRowType rowType = buildSeaTunnelRowType();
- CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("", "",
"", "", rowType);
+ CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("", "",
"", "test", rowType);
SeaTunnelRow seaTunnelRow = buildSeaTunnelRow();
AvroSerializationSchema serializationSchema = new
AvroSerializationSchema(rowType);
byte[] bytes = serializationSchema.serialize(seaTunnelRow);
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
index fb6fd9da76..beda96ff6e 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@@ -170,7 +170,7 @@ public class JsonRowDataSerDeSchemaTest {
new MapType(STRING_TYPE, new
MapType(STRING_TYPE, INT_TYPE))
})
});
- CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "", schema);
+ CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "test", schema);
JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, false, false);
@@ -230,7 +230,7 @@ public class JsonRowDataSerDeSchemaTest {
new SeaTunnelDataType[] {STRING_TYPE,
INT_TYPE})
});
- CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "", schema);
+ CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "test", schema);
JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, false, false);
@@ -308,7 +308,7 @@ public class JsonRowDataSerDeSchemaTest {
new MapType(STRING_TYPE, DOUBLE_TYPE)
});
- CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "", rowType);
+ CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "test", rowType);
JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, false, true);
@@ -327,7 +327,7 @@ public class JsonRowDataSerDeSchemaTest {
public void testDeserializationNullRow() throws Exception {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
- CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "", schema);
+ CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "test", schema);
JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, true, false);
@@ -339,7 +339,7 @@ public class JsonRowDataSerDeSchemaTest {
public void testDeserializationMissingNode() throws Exception {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
- CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "", schema);
+ CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "test", schema);
JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, true, false);
@@ -359,7 +359,7 @@ public class JsonRowDataSerDeSchemaTest {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
- CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "", schema);
+ CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "test", schema);
// pass on missing field
final JsonDeserializationSchema deser =
@@ -382,7 +382,7 @@ public class JsonRowDataSerDeSchemaTest {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
- CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "", schema);
+ CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "test", schema);
// fail on missing field
final JsonDeserializationSchema deser =
@@ -418,7 +418,7 @@ public class JsonRowDataSerDeSchemaTest {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
SeaTunnelRow expected = new SeaTunnelRow(1);
- CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "", schema);
+ CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "test", schema);
// ignore on parse error
final JsonDeserializationSchema deser =
@@ -446,7 +446,7 @@ public class JsonRowDataSerDeSchemaTest {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
- CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "", schema);
+ CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "",
"", "test", schema);
String noJson = "{]";
final JsonDeserializationSchema deser =
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
index d35849e8bd..efd639cd7b 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
@@ -54,7 +54,7 @@ public class CanalJsonSerDeSchemaTest {
new String[] {"id", "name", "description", "weight"},
new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE,
STRING_TYPE, FLOAT_TYPE});
private static final CatalogTable catalogTables =
- CatalogTableUtil.getCatalogTable("", "", "", "",
SEATUNNEL_ROW_TYPE);
+ CatalogTableUtil.getCatalogTable("", "", "", "test",
SEATUNNEL_ROW_TYPE);
@Test
public void testFilteringTables() throws Exception {
@@ -167,32 +167,32 @@ public class CanalJsonSerDeSchemaTest {
List<String> expected =
Arrays.asList(
- "SeaTunnelRow{tableId=.., kind=+I, fields=[101,
scooter, Small 2-wheel scooter, 3.14]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[102, car
battery, 12V car battery, 8.1]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[104,
hammer, 12oz carpenter's hammer, 0.75]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[105,
hammer, 14oz carpenter's hammer, 0.875]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[106,
hammer, null, 1.0]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[107, rocks,
box of assorted rocks, 5.3]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[108,
jacket, water resistent black wind breaker, 0.1]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[109, spare
tire, 24 inch spare tire, 22.2]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[106,
hammer, null, 1.0]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[106,
hammer, 18oz carpenter hammer, 1.0]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[107, rocks,
box of assorted rocks, 5.3]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[107, rocks,
box of assorted rocks, 5.1]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[110,
jacket, new water resistent white wind breaker, 0.5]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}",
- "SeaTunnelRow{tableId=.., kind=-D, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[101,
scooter, Small 2-wheel scooter, 3.14]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[101,
scooter, Small 2-wheel scooter, 5.17]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[102, car
battery, 12V car battery, 8.1]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[102, car
battery, 12V car battery, 5.17]}",
- "SeaTunnelRow{tableId=.., kind=-D, fields=[102, car
battery, 12V car battery, 5.17]}",
- "SeaTunnelRow{tableId=.., kind=-D, fields=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8]}");
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[101,
scooter, Small 2-wheel scooter, 3.14]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[102,
car battery, 12V car battery, 8.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[104,
hammer, 12oz carpenter's hammer, 0.75]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[105,
hammer, 14oz carpenter's hammer, 0.875]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[106,
hammer, null, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[107,
rocks, box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[108,
jacket, water resistent black wind breaker, 0.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[109,
spare tire, 24 inch spare tire, 22.2]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[106,
hammer, null, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[106,
hammer, 18oz carpenter hammer, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[107,
rocks, box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[107,
rocks, box of assorted rocks, 5.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[110,
jacket, new water resistent white wind breaker, 0.5]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-D, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[101,
scooter, Small 2-wheel scooter, 3.14]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[101,
scooter, Small 2-wheel scooter, 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[102,
car battery, 12V car battery, 8.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[102,
car battery, 12V car battery, 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-D, fields=[102,
car battery, 12V car battery, 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-D, fields=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8]}");
List<String> actual =
collector.list.stream().map(Object::toString).collect(Collectors.toList());
assertEquals(expected, actual);
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
index 67d499efd9..a970aea55a 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -55,7 +55,7 @@ public class DebeziumJsonSerDeSchemaTest {
new String[] {"id", "name", "description", "weight"},
new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE,
STRING_TYPE, FLOAT_TYPE});
private static final CatalogTable catalogTables =
- CatalogTableUtil.getCatalogTable("", "", "", "",
SEATUNNEL_ROW_TYPE);
+ CatalogTableUtil.getCatalogTable("", "", "", "test",
SEATUNNEL_ROW_TYPE);
@Test
void testNullRowMessages() throws Exception {
@@ -175,26 +175,26 @@ public class DebeziumJsonSerDeSchemaTest {
List<String> expected =
Arrays.asList(
- "SeaTunnelRow{tableId=.., kind=+I, fields=[101,
scooter, Small 2-wheel scooter, 3.14]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[102, car
battery, 12V car battery, 8.1]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[104,
hammer, 12oz carpenter's hammer, 0.75]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[105,
hammer, 14oz carpenter's hammer, 0.875]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[106,
hammer, 16oz carpenter's hammer, 1.0]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[107, rocks,
box of assorted rocks, 5.3]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[108,
jacket, water resistent black wind breaker, 0.1]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[109, spare
tire, 24 inch spare tire, 22.2]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[106,
hammer, 16oz carpenter's hammer, 1.0]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[106,
hammer, 18oz carpenter hammer, 1.0]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[107, rocks,
box of assorted rocks, 5.3]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[107, rocks,
box of assorted rocks, 5.1]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[110,
jacket, new water resistent white wind breaker, 0.5]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}",
- "SeaTunnelRow{tableId=.., kind=-D, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}");
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[101,
scooter, Small 2-wheel scooter, 3.14]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[102,
car battery, 12V car battery, 8.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[104,
hammer, 12oz carpenter's hammer, 0.75]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[105,
hammer, 14oz carpenter's hammer, 0.875]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[106,
hammer, 16oz carpenter's hammer, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[107,
rocks, box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[108,
jacket, water resistent black wind breaker, 0.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[109,
spare tire, 24 inch spare tire, 22.2]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[106,
hammer, 16oz carpenter's hammer, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[106,
hammer, 18oz carpenter hammer, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[107,
rocks, box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[107,
rocks, box of assorted rocks, 5.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[110,
jacket, new water resistent white wind breaker, 0.5]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-D, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}");
List<String> actual =
collector.list.stream().map(Object::toString).collect(Collectors.toList());
assertEquals(expected, actual);
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
index a4e06ac2b1..f82b272cf7 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
@@ -50,7 +50,7 @@ public class MaxWellJsonSerDeSchemaTest {
new String[] {"id", "name", "description", "weight"},
new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE,
STRING_TYPE, FLOAT_TYPE});
private static final CatalogTable catalogTables =
- CatalogTableUtil.getCatalogTable("", "", "", "",
SEATUNNEL_ROW_TYPE);
+ CatalogTableUtil.getCatalogTable("", "", "", "test",
SEATUNNEL_ROW_TYPE);
@Test
public void testFilteringTables() throws Exception {
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
index 04fea16eca..20df0d945a 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
@@ -55,7 +55,7 @@ public class OggJsonSerDeSchemaTest {
new String[] {"id", "name", "description", "weight"},
new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE,
STRING_TYPE, FLOAT_TYPE});
private static final CatalogTable catalogTables =
- CatalogTableUtil.getCatalogTable("", "", "", "",
SEATUNNEL_ROW_TYPE);
+ CatalogTableUtil.getCatalogTable("", "", "", "test",
SEATUNNEL_ROW_TYPE);
@Test
public void testFilteringTables() throws Exception {
@@ -172,26 +172,26 @@ public class OggJsonSerDeSchemaTest {
List<String> expected =
Arrays.asList(
- "SeaTunnelRow{tableId=.., kind=+I, fields=[101,
scooter, Small 2-wheel scooter, 3.14]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[102, car
battery, 12V car battery, 8.1]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[104,
hammer, 12oz carpenter's hammer, 0.75]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[105,
hammer, 14oz carpenter's hammer, 0.875]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[106,
hammer, 16oz carpenter's hammer, 1.0]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[107, rocks,
box of assorted rocks, 5.3]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[108,
jacket, water resistent black wind breaker, 0.1]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[109, spare
tire, 24 inch spare tire, 22.2]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[106,
hammer, 16oz carpenter's hammer, 1.0]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[106,
hammer, 18oz carpenter hammer, 1.0]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[107, rocks,
box of assorted rocks, 5.3]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[107, rocks,
box of assorted rocks, 5.1]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
- "SeaTunnelRow{tableId=.., kind=+I, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[110,
jacket, new water resistent white wind breaker, 0.5]}",
- "SeaTunnelRow{tableId=.., kind=-U, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
- "SeaTunnelRow{tableId=.., kind=+U, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}",
- "SeaTunnelRow{tableId=.., kind=-D, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}");
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[101,
scooter, Small 2-wheel scooter, 3.14]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[102,
car battery, 12V car battery, 8.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[104,
hammer, 12oz carpenter's hammer, 0.75]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[105,
hammer, 14oz carpenter's hammer, 0.875]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[106,
hammer, 16oz carpenter's hammer, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[107,
rocks, box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[108,
jacket, water resistent black wind breaker, 0.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[109,
spare tire, 24 inch spare tire, 22.2]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[106,
hammer, 16oz carpenter's hammer, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[106,
hammer, 18oz carpenter hammer, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[107,
rocks, box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[107,
rocks, box of assorted rocks, 5.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[110,
jacket, new water resistent white wind breaker, 0.5]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-D, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}");
List<String> actual =
collector.list.stream().map(Object::toString).collect(Collectors.toList());
assertEquals(expected, actual);