This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.17.x by this push:
new 453d3aa55 KUDU-3483 Fix flushing data in batch when table schema
changed
453d3aa55 is described below
commit 453d3aa5515d73ab7df8a41a38ed56cbfdfbc6b9
Author: xinghuayu007 <[email protected]>
AuthorDate: Tue May 30 17:03:44 2023 +0800
KUDU-3483 Fix flushing data in batch when table schema changed
In auto_flush_background or manual_flush mode, applying an operation
firstly inserts the row into the buffer. When the buffer is full or
function flush() is called, it tries to flush multiple rows into
Kudu server. Firstly, it groups this data according to the tablet
id as a batch. A batch may contains multiple rows which belong to
the same tablet. Then a batch will encode into bytes. At this time,
it reads the table schema of the first row and decides the format
of the data. If two rows have different schema but belongs to the same
tablet, which maybe because of altering the table between inserting
two rows, it causes array index outof index bound exception.
This patch will validate the schema of multiple rows which belong
to the same tablet. If the schema is different, it puts them into
the different groups as different batches.
Change-Id: Ie6501962b32814d121f180b2942999c402d927db
Reviewed-on: http://gerrit.cloudera.org:8080/19949
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
(cherry picked from commit c9e6e36a742c1164bf20a1913a50b9bd03807ed7)
Reviewed-on: http://gerrit.cloudera.org:8080/20622
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Wang Xixu <[email protected]>
Reviewed-by: Yifan Zhang <[email protected]>
---
.../src/main/java/org/apache/kudu/Schema.java | 30 ++++
.../org/apache/kudu/client/AsyncKuduSession.java | 52 +++++--
java/kudu-client/src/test/java/org/TestSchema.java | 135 +++++++++++++++++
.../org/apache/kudu/client/TestAlterTable.java | 168 ++++++++++++++++++++-
4 files changed, 366 insertions(+), 19 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
index 5ff514bf1..335bd512b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -458,4 +459,33 @@ public class Schema {
Preconditions.checkState(hasIsDeleted(), "Schema doesn't have an
IS_DELETED columns");
return isDeletedIndex;
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof Schema)) {
+ return false;
+ }
+ Schema that = (Schema) obj;
+ if (this.getPrimaryKeyColumnCount() != that.getPrimaryKeyColumnCount()) {
+ return false;
+ }
+ if (this.getColumns().size() != that.getColumns().size()) {
+ return false;
+ }
+ for (int i = 0; i < this.getColumns().size(); i++) {
+ if (!this.getColumnByIndex(i).equals(that.getColumnByIndex(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(varLengthColumnCount, rowSize, isKeyUnique,
hasNullableColumns,
+ hasImmutableColumns, hasAutoIncrementingColumn);
+ }
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index aa449d391..f0d641786 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -344,7 +344,8 @@ public class AsyncKuduSession implements
SessionConfiguration {
/**
* Callback which waits for all tablet location lookups to complete, groups
all operations into
- * batches by tablet, and dispatches them. When all of the batches are
complete, a deferred is
+ * batches by tablet, puts operations into extraBatches which have different
schemas with the
+ * ones in batches, then dispatches them. When all of the batches are
complete, a deferred is
* fired and the buffer is added to the inactive queue.
*/
private final class TabletLookupCB implements Callback<Void, Object> {
@@ -367,8 +368,10 @@ public class AsyncKuduSession implements
SessionConfiguration {
// The final tablet lookup is complete. Batch all of the buffered
// operations into their respective tablet, and then send the batches.
- // Group the operations by tablet.
- Map<Slice, Batch> batches = new HashMap<>();
+ // Group the operations by tablet. If two operations belong to the same
+ // tablet but have different table schemas, they will be put into two
+ // separate batches.
+ Map<Slice, List<Batch>> batches = new HashMap<>();
List<OperationResponse> opsFailedInLookup = new ArrayList<>();
List<Integer> opsFailedIndexesList = new ArrayList<>();
@@ -403,27 +406,48 @@ public class AsyncKuduSession implements
SessionConfiguration {
LocatedTablet tablet = bufferedOp.getTablet();
Slice tabletId = new Slice(tablet.getTabletId());
- Batch batch = batches.get(tabletId);
- if (batch == null) {
- batch = new Batch(operation.getTable(), tablet,
ignoreAllDuplicateRows,
+ List<Batch> batchList = batches.get(tabletId);
+ if (batchList == null) {
+ Batch batch = new Batch(operation.getTable(), tablet,
ignoreAllDuplicateRows,
ignoreAllNotFoundRows, txnId);
- batches.put(tabletId, batch);
+ batch.add(operation, currentIndex++);
+ List<Batch> list = new ArrayList<>();
+ list.add(batch);
+ batches.put(tabletId, list);
+ continue;
+ }
+ // Compare with the last schema in the list, because the last
operations
+ // have the same schemas with it most likely.
+ Batch lastBatch = batchList.get(batchList.size() - 1);
+ if (lastBatch.operations.get(0).table.getSchema()
+ .equals(operation.table.getSchema())) {
+ lastBatch.add(operation, currentIndex++);
+ continue;
}
+ // Put it into a separate batch.
+ Batch batch = new Batch(operation.getTable(), tablet,
ignoreAllDuplicateRows,
+ ignoreAllNotFoundRows, txnId);
batch.add(operation, currentIndex++);
+ batchList.add(batch);
}
-
- List<Deferred<BatchResponse>> batchResponses = new
ArrayList<>(batches.size() + 1);
+ int batchSize = 0;
+ for (List<Batch> batchList : batches.values()) {
+ batchSize += batchList.size();
+ }
+ List<Deferred<BatchResponse>> batchResponses = new ArrayList<>(batchSize
+ 1);
if (!opsFailedInLookup.isEmpty()) {
batchResponses.add(
Deferred.fromResult(new BatchResponse(opsFailedInLookup,
opsFailedIndexesList)));
}
- for (Batch batch : batches.values()) {
- if (timeoutMillis != 0) {
- batch.resetTimeoutMillis(client.getTimer(), timeoutMillis);
+ for (List<Batch> batchList : batches.values()) {
+ for (Batch batch : batchList) {
+ if (timeoutMillis != 0) {
+ batch.resetTimeoutMillis(client.getTimer(), timeoutMillis);
+ }
+ addBatchCallbacks(batch);
+ batchResponses.add(client.sendRpcToTablet(batch));
}
- addBatchCallbacks(batch);
- batchResponses.add(client.sendRpcToTablet(batch));
}
// On completion of all batches, fire the completion deferred, and add
the buffer
diff --git a/java/kudu-client/src/test/java/org/TestSchema.java
b/java/kudu-client/src/test/java/org/TestSchema.java
new file mode 100644
index 000000000..9fecd7c38
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/TestSchema.java
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+
+import org.junit.Test;
+
+public class TestSchema {
+
+ @Test
+ public void testEquals() {
+ final ColumnSchema col1 = new ColumnSchema.ColumnSchemaBuilder("c0",
Type.INT32)
+ .nullable(false)
+ .key(true)
+ .build();
+ final ColumnSchema col2 = new ColumnSchema.ColumnSchemaBuilder("c1",
Type.INT32)
+ .nullable(false)
+ .build();
+
+ ArrayList<ColumnSchema> columns = new ArrayList<>();
+ columns.add(col1);
+ columns.add(col2);
+ final Schema schema = new Schema(columns);
+
+ ArrayList<ColumnSchema> columns1 = new ArrayList<>();
+ columns1.add(col1);
+ columns1.add(col2);
+ final Schema schema1 = new Schema(columns1);
+
+ // Two objects are the same.
+ assertTrue(schema1.equals(schema1));
+ // One of object is not type of 'Schema'.
+ assertFalse(schema1.equals(columns1));
+ // Two schemas are the same structure.
+ assertTrue(schema1.equals(schema));
+
+ final ColumnSchema col3 = new ColumnSchema.ColumnSchemaBuilder("c2",
Type.INT32)
+ .nullable(false)
+ .key(true)
+ .build();
+
+ ArrayList<ColumnSchema> columns2 = new ArrayList<>();
+ columns2.add(col1);
+ columns2.add(col3);
+ final Schema schema2 = new Schema(columns2);
+
+ // Two schemas have different number of primary keys.
+ assertFalse(schema1.equals(schema2));
+
+ ArrayList<ColumnSchema> columns3 = new ArrayList<>();
+ columns3.add(col1);
+ columns3.add(col2);
+ columns3.add(col3);
+ final Schema schema3 = new Schema(columns3);
+
+ // Two schemas have different number of columns.
+ assertFalse(schema1.equals(schema3));
+
+ final ColumnSchema col4 = new ColumnSchema.ColumnSchemaBuilder("c3",
Type.INT32)
+ .nullable(false)
+ .build();
+
+ ArrayList<ColumnSchema> columns4 = new ArrayList<>();
+ columns4.add(col1);
+ columns4.add(col2);
+ columns4.add(col4);
+ final Schema schema4 = new Schema(columns4);
+
+ final ColumnSchema col5 = new ColumnSchema.ColumnSchemaBuilder("c4",
Type.INT32)
+ .nullable(false)
+ .build();
+ ArrayList<ColumnSchema> columns5 = new ArrayList<>();
+ columns5.add(col1);
+ columns5.add(col2);
+ columns5.add(col5);
+ final Schema schema5 = new Schema(columns5);
+
+ // Two schemas have different column names.
+ assertFalse(schema4.equals(schema5));
+
+ final ColumnSchema col6 = new ColumnSchema.ColumnSchemaBuilder("c4",
Type.STRING)
+ .nullable(false)
+ .build();
+
+ ArrayList<ColumnSchema> columns6 = new ArrayList<>();
+ columns6.add(col1);
+ columns6.add(col2);
+ columns6.add(col6);
+ final Schema schema6 = new Schema(columns6);
+
+ // Two schemas have different column types.
+ assertFalse(schema5.equals(schema6));
+
+ ArrayList<ColumnSchema> columns7 = new ArrayList<>();
+ columns7.add(col1);
+ columns7.add(col6);
+ columns7.add(col2);
+ final Schema schema7 = new Schema(columns7);
+
+ // Two schemas have different sequence of columns.
+ assertFalse(schema6.equals(schema7));
+
+ final ColumnSchema col7 = new ColumnSchema.ColumnSchemaBuilder("c1",
Type.INT32)
+ .nullable(true)
+ .build();
+ // Two column schemas with exact the same types, names, sequence of columns
+ // but different nullability for a non-key column
+ ArrayList<ColumnSchema> columns8 = new ArrayList<>();
+ columns7.add(col1);
+ columns7.add(col6);
+ columns7.add(col7);
+ final Schema schema8 = new Schema(columns8);
+
+ assertFalse(schema7.equals(schema8));
+ }
+}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
index 4654517c9..d1f41c829 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
@@ -68,7 +68,7 @@ public class TestAlterTable {
* with the provided bounds.
*/
private KuduTable createTable(List<Pair<Integer, Integer>> bounds) throws
KuduException {
- return createTable(bounds, null);
+ return createTable(bounds, null, 2);
}
/**
@@ -76,7 +76,8 @@ public class TestAlterTable {
* The table is hash partitioned on c0 into two buckets, and range
partitioned
* with the provided bounds and the specified owner.
*/
- private KuduTable createTable(List<Pair<Integer, Integer>> bounds, String
owner)
+ private KuduTable createTable(List<Pair<Integer, Integer>> bounds, String
owner,
+ int buckets)
throws KuduException {
// Create initial table with single range partition covering the entire key
// space, and two hash buckets.
@@ -92,8 +93,10 @@ public class TestAlterTable {
CreateTableOptions createOptions =
new
CreateTableOptions().setRangePartitionColumns(ImmutableList.of("c0"))
- .setNumReplicas(1)
- .addHashPartitions(ImmutableList.of("c0"), 2);
+ .setNumReplicas(1);
+ if (buckets > 1) {
+ createOptions = createOptions.addHashPartitions(ImmutableList.of("c0"),
buckets);
+ }
for (Pair<Integer, Integer> bound : bounds) {
PartialRow lower = schema.newPartialRow();
@@ -132,6 +135,161 @@ public class TestAlterTable {
assertEquals(String.format("row errors: %s", Arrays.toString(rowErrors)),
0, rowErrors.length);
}
+ private int countRows(KuduTable table) throws KuduException {
+ KuduScanner scanner = client.newScannerBuilder(table).build();
+ int rowCount = 0;
+ while (scanner.hasMoreRows()) {
+ RowResultIterator it = scanner.nextRows();
+ rowCount += it.getNumRows();
+ }
+ return rowCount;
+ }
+
+ // This unit test is used to verify the problem KUDU-3483. Without the fix,
+ // this unit test will throw an out of index exception.
+ @Test
+ public void testInsertDataWithChangedSchema() throws Exception {
+ // Create a table with single partition in order to make all operations
+ // fall into the same tablet.
+ KuduTable table = createTable(ImmutableList.of(), null, 1);
+ final KuduSession session = client.newSession();
+ session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+
+ // Test case with the same table schema.
+ {
+ Insert insert = table.newInsert();
+ PartialRow row1 = insert.getRow();
+ row1.addInt("c0", 101);
+ row1.addInt("c1", 101);
+ session.apply(insert);
+
+ Upsert upsert = table.newUpsert();
+ PartialRow row2 = upsert.getRow();
+ row2.addInt("c0", 102);
+ row2.addInt("c1", 102);
+ session.apply(upsert);
+ List<OperationResponse> responses = session.flush();
+ assertEquals(responses.size(), 2);
+
+ RowError[] rowErrors = session.getPendingErrors().getRowErrors();
+ assertEquals(String.format("row errors: %s",
+ Arrays.toString(rowErrors)), 0, rowErrors.length);
+ assertEquals(2, countRows(table));
+ }
+
+ // Test case with adding columns.
+ {
+ // Upsert a row with the old schema.
+ Upsert upsert1 = table.newUpsert();
+ PartialRow row1 = upsert1.getRow();
+ row1.addInt("c0", 103);
+ row1.addInt("c1", 103);
+ session.apply(upsert1);
+
+ // Add one new column.
+ client.alterTable(tableName, new AlterTableOptions()
+ .addColumn("addNonNull", Type.INT32, 100));
+
+ // Reopen the table with new schema.
+ table = client.openTable(tableName);
+ assertEquals(3, table.getSchema().getColumnCount());
+
+ // Upsert a row with the new schema.
+ Upsert upsert2 = table.newUpsert();
+ PartialRow row2 = upsert2.getRow();
+ row2.addInt("c0", 104);
+ row2.addInt("c1", 104);
+ row2.addInt("addNonNull", 101);
+
+ session.apply(upsert2);
+ List<OperationResponse> responses = session.flush();
+ assertEquals(responses.size(), 2);
+
+ RowError[] rowErrors = session.getPendingErrors().getRowErrors();
+ assertEquals(String.format("row errors: %s",
+ Arrays.toString(rowErrors)), 0, rowErrors.length);
+
+ // Read the data. It contains 4 rows.
+ assertEquals(4, countRows(table));
+ }
+
+ // Test case with renamed columns.
+ {
+ table = client.openTable(tableName);
+ // Upsert a row.
+ Upsert upsert1 = table.newUpsert();
+ PartialRow row1 = upsert1.getRow();
+ row1.addInt("c0", 105);
+ row1.addInt("c1", 105);
+ row1.addInt("addNonNull", 101);
+ session.apply(upsert1);
+
+ // Rename one column.
+ client.alterTable(tableName, new AlterTableOptions()
+ .renameColumn("addNonNull", "newAddNonNull"));
+
+ // Reopen the table with the new schema.
+ table = client.openTable(tableName);
+ assertEquals(3, table.getSchema().getColumnCount());
+
+ // Upsert a row with the new schema.
+ Upsert upsert2 = table.newUpsert();
+ PartialRow row2 = upsert2.getRow();
+ row2.addInt("c0", 106);
+ row2.addInt("c1", 106);
+ row2.addInt("newAddNonNull", 101);
+ session.apply(upsert2);
+ List<OperationResponse> responses = session.flush();
+ assertEquals(responses.size(), 2);
+
+ RowError[] rowErrors = session.getPendingErrors().getRowErrors();
+ assertEquals(String.format("row errors: %s",
+ Arrays.toString(rowErrors)), 1, rowErrors.length);
+ assertTrue(Arrays.toString(rowErrors)
+ .contains("Client provided column addNonNull INT32 NOT NULL not
present in tablet"));
+
+ // Read the data. It contains 5 rows, one row failed to insert.
+ assertEquals(5, countRows(table));
+ }
+
+ // Test case with drop columns.
+ {
+ // Upsert a row.
+ Upsert upsert1 = table.newUpsert();
+ PartialRow row1 = upsert1.getRow();
+ row1.addInt("c0", 107);
+ row1.addInt("c1", 107);
+ row1.addInt("newAddNonNull", 101);
+ session.apply(upsert1);
+
+ // Drop one column.
+ client.alterTable(tableName, new AlterTableOptions()
+ .dropColumn("newAddNonNull"));
+
+ // Reopen the table with the new schema.
+ table = client.openTable(tableName);
+ assertEquals(2, table.getSchema().getColumnCount());
+
+ // Upsert a row with the new schema.
+ Upsert upsert2 = table.newUpsert();
+ PartialRow row2 = upsert2.getRow();
+ row2.addInt("c0", 108);
+ row2.addInt("c1", 108);
+ session.apply(upsert2);
+ List<OperationResponse> responses = session.flush();
+ assertEquals(responses.size(), 2);
+
+ RowError[] rowErrors = session.getPendingErrors().getRowErrors();
+ assertEquals(String.format("row errors: %s",
+ Arrays.toString(rowErrors)), 1, rowErrors.length);
+ assertTrue(Arrays.toString(rowErrors)
+ .contains("Client provided column newAddNonNull INT32 NOT NULL not
present in tablet"));
+
+ // Read the data. It contains 6 rows, one row failed to insert.
+ assertEquals(6, countRows(table));
+ }
+ }
+
@Test
public void testAlterAddColumns() throws Exception {
KuduTable table = createTable(ImmutableList.of());
@@ -1065,7 +1223,7 @@ public class TestAlterTable {
@Test
public void testAlterChangeOwner() throws Exception {
String originalOwner = "alice";
- KuduTable table = createTable(ImmutableList.of(), originalOwner);
+ KuduTable table = createTable(ImmutableList.of(), originalOwner, 2);
assertEquals(originalOwner, table.getOwner());
String newOwner = "bob";