This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 14234a62c6dec587d5a8cb6a0ff3e3b3a9154409 Author: Brock Noland <[email protected]> AuthorDate: Sun Dec 2 00:42:01 2018 +0000 [java] KUDU-1563. Add support for INSERT_IGNORE Implements java support for the `INSERT_IGNORE' operation. I manually tested this against an old server version without INSERT_IGNORE operation support and it returns the correct error with a message that says: “Unknown operation type: 10”. Change-Id: Ib0cc4a533dfb01a883d347c9795c165aa8efa3fd Reviewed-on: http://gerrit.cloudera.org:8080/4523 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> --- .../java/org/apache/kudu/client/InsertIgnore.java | 39 +++++++++++ .../java/org/apache/kudu/client/KuduTable.java | 10 +++ .../java/org/apache/kudu/client/Operation.java | 3 +- .../org/apache/kudu/client/TestKuduSession.java | 75 ++++++++++++++++++++++ 4 files changed, 126 insertions(+), 1 deletion(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/InsertIgnore.java b/java/kudu-client/src/main/java/org/apache/kudu/client/InsertIgnore.java new file mode 100644 index 0000000..b7be9e5 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/InsertIgnore.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.client; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Represents a single row insert ignoring duplicate rows. Instances of this class should not + * be reused. + */ [email protected] [email protected] +public class InsertIgnore extends Operation { + + InsertIgnore(KuduTable table) { + super(table); + } + + @Override + ChangeType getChangeType() { + return ChangeType.INSERT_IGNORE; + } +} diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java index 68dcb6f..5d76bf5 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java @@ -167,6 +167,16 @@ public class KuduTable { } /** + * Get a new insert ignore configured with this table's schema. An insert ignore will + * ignore duplicate row errors. This is useful when the same insert may be sent multiple times. + * The returned object should not be reused. + * @return an insert ignore with this table's schema + */ + public InsertIgnore newInsertIgnore() { + return new InsertIgnore(this); + } + + /** * Asynchronously get all the tablets for this table. * @param deadline max time spent in milliseconds for the deferred result of this method to * get called back, if deadline is reached, the deferred result will get erred back diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java index 531c776..cc815d3 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java @@ -72,7 +72,8 @@ public abstract class Operation extends KuduRpc<OperationResponse> { EXCLUSIVE_RANGE_LOWER_BOUND( (byte) RowOperationsPB.Type.EXCLUSIVE_RANGE_LOWER_BOUND.getNumber()), INCLUSIVE_RANGE_UPPER_BOUND( - (byte) RowOperationsPB.Type.INCLUSIVE_RANGE_UPPER_BOUND.getNumber()); + (byte) RowOperationsPB.Type.INCLUSIVE_RANGE_UPPER_BOUND.getNumber()), + INSERT_IGNORE((byte) RowOperationsPB.Type.INSERT_IGNORE.getNumber()); ChangeType(byte encodedByte) { this.encodedByte = encodedByte; diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java index c5a651b..828faf9 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java @@ -311,6 +311,70 @@ public class TestKuduSession { } @Test(timeout = 10000) + public void testInsertIgnoreAfterInsertHasNoRowError() throws Exception { + KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions()); + KuduSession session = client.newSession(); + session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); + + session.apply(createInsert(table, 1)); + session.apply(createUpsert(table, 1, 1, false)); + session.apply(createInsertIgnore(table, 1)); + List<OperationResponse> results = session.flush(); + for (OperationResponse result : results) { + assertFalse(result.toString(), result.hasRowError()); + } + List<String> rowStrings = scanTableToStrings(table); + assertEquals(1, rowStrings.size()); + assertEquals( + "INT32 key=1, INT32 column1_i=1, INT32 column2_i=3, " + + "STRING column3_s=a string, BOOL column4_b=true", + rowStrings.get(0)); + } + + @Test(timeout = 10000) + public void testInsertAfterInsertIgnoreHasRowError() throws Exception { + KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions()); + KuduSession session = client.newSession(); + session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); + + session.apply(createInsertIgnore(table, 1)); + session.apply(createInsert(table, 1)); + List<OperationResponse> results = session.flush(); + assertFalse(results.get(0).toString(), results.get(0).hasRowError()); + assertTrue(results.get(1).toString(), results.get(1).hasRowError()); + assertTrue(results.get(1).getRowError().getErrorStatus().isAlreadyPresent()); + List<String> rowStrings = scanTableToStrings(table); + assertEquals(1, rowStrings.size()); + assertEquals( + "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " + + "STRING column3_s=a string, BOOL column4_b=true", + rowStrings.get(0)); + } + + @Test(timeout = 10000) + public void testInsertIgnore() throws Exception { + KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions()); + KuduSession session = client.newSession(); + + // Test insert ignore implements normal insert. + assertFalse(session.apply(createInsertIgnore(table, 1)).hasRowError()); + List<String> rowStrings = scanTableToStrings(table); + assertEquals( + "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " + + "STRING column3_s=a string, BOOL column4_b=true", + rowStrings.get(0)); + + // Test insert ignore does not return a row error. + assertFalse(session.apply(createInsertIgnore(table, 1)).hasRowError()); + rowStrings = scanTableToStrings(table); + assertEquals( + "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " + + "STRING column3_s=a string, BOOL column4_b=true", + rowStrings.get(0)); + + } + + @Test(timeout = 10000) public void testInsertManualFlushNonCoveredRange() throws Exception { CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange(); createOptions.setNumReplicas(1); @@ -458,4 +522,15 @@ public class TestKuduSession { row.addInt(0, key); return delete; } + + protected InsertIgnore createInsertIgnore(KuduTable table, int key) { + InsertIgnore insertIgnore = table.newInsertIgnore(); + PartialRow row = insertIgnore.getRow(); + row.addInt(0, key); + row.addInt(1, 2); + row.addInt(2, 3); + row.addString(3, "a string"); + row.addBoolean(4, true); + return insertIgnore; + } }
