This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new f64ed2aac KUDU-1945: Support non unique primary key for Java client
f64ed2aac is described below
commit f64ed2aac40515ae46132a9bc3cdf7ad5b3f33de
Author: wzhou-code <[email protected]>
AuthorDate: Fri Jan 13 17:48:24 2023 -0800
KUDU-1945: Support non unique primary key for Java client
This patch adds new APIs to create ColumnSchema with non unique
primary key for Java client. When a table with non unique primary
key is created, Auto-Incrementing Column "auto_incrementing_id" will
be added automatically to the table as the key column. The non-unique
key columns and the auto-incrementing column together form the
effective primary key.
UPSERT/UPSERT_IGNORE operations are not supported now for Kudu table
with auto-incrementing column due to limitation in Kudu server.
Auto-Incrementing column cannot be added, removed or renamed with
Alter Table APIs.
Testing:
- Added unit-test for Java client library.
- Manually ran integration test with Impala for creating table
with non unique primary key, and ran queries for operations:
describe/insert/update/delete/upsert/CTAS/select/alter, etc.
Passed Kudu related end-to-end tests.
Change-Id: I7e2501d6b3d66f6466959e4f3f1ed0f5e08dfe5c
Reviewed-on: http://gerrit.cloudera.org:8080/19384
Reviewed-by: Alexey Serbin <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
.../main/java/org/apache/kudu/ColumnSchema.java | 138 ++++++++++++-
.../src/main/java/org/apache/kudu/Schema.java | 91 +++++++++
.../org/apache/kudu/client/AlterTableOptions.java | 25 +++
.../org/apache/kudu/client/AsyncKuduScanner.java | 2 +
.../java/org/apache/kudu/client/KuduTable.java | 10 +
.../org/apache/kudu/client/ProtobufHelper.java | 66 +++++--
.../java/org/apache/kudu/TestColumnSchema.java | 33 ++++
.../org/apache/kudu/client/TestKuduClient.java | 219 +++++++++++++++++++++
.../java/org/apache/kudu/client/TestKuduTable.java | 102 ++++++++++
.../java/org/apache/kudu/test/ClientTestUtil.java | 9 +
10 files changed, 676 insertions(+), 19 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
index 62e399550..26b679deb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
@@ -39,8 +39,10 @@ public class ColumnSchema {
private final String name;
private final Type type;
private final boolean key;
+ private final boolean keyUnique;
private final boolean nullable;
private final boolean immutable;
+ private final boolean autoIncrementing;
private final Object defaultValue;
private final int desiredBlockSize;
private final Encoding encoding;
@@ -104,7 +106,8 @@ public class ColumnSchema {
}
}
- private ColumnSchema(String name, Type type, boolean key, boolean nullable,
boolean immutable,
+ private ColumnSchema(String name, Type type, boolean key, boolean keyUnique,
+ boolean nullable, boolean immutable, boolean
autoIncrementing,
Object defaultValue, int desiredBlockSize, Encoding
encoding,
CompressionAlgorithm compressionAlgorithm,
ColumnTypeAttributes typeAttributes, Common.DataType
wireType,
@@ -112,8 +115,10 @@ public class ColumnSchema {
this.name = name;
this.type = type;
this.key = key;
+ this.keyUnique = keyUnique;
this.nullable = nullable;
this.immutable = immutable;
+ this.autoIncrementing = autoIncrementing;
this.defaultValue = defaultValue;
this.desiredBlockSize = desiredBlockSize;
this.encoding = encoding;
@@ -148,6 +153,14 @@ public class ColumnSchema {
return key;
}
+ /**
+ * Answers if the key is unique
+ * @return true if the key is unique
+ */
+ public boolean isKeyUnique() {
+ return keyUnique;
+ }
+
/**
* Answers if the column can be set to null
* @return true if it can be set to null, else false
@@ -164,6 +177,14 @@ public class ColumnSchema {
return immutable;
}
+ /**
+ * Answers if the column is auto-incrementing column
+ * @return true if the column value is automatically assigned with
incrementing value
+ */
+ public boolean isAutoIncrementing() {
+ return autoIncrementing;
+ }
+
/**
* The Java object representation of the default value that's read
* @return the default read value
@@ -239,6 +260,8 @@ public class ColumnSchema {
return Objects.equals(name, that.name) &&
Objects.equals(type, that.type) &&
Objects.equals(key, that.key) &&
+ Objects.equals(keyUnique, that.keyUnique) &&
+ Objects.equals(autoIncrementing, that.autoIncrementing) &&
Objects.equals(typeAttributes, that.typeAttributes) &&
Objects.equals(comment, that.comment);
}
@@ -276,6 +299,7 @@ public class ColumnSchema {
private final String name;
private final Type type;
private boolean key = false;
+ private boolean keyUnique = false;
private boolean nullable = false;
private boolean immutable = false;
private Object defaultValue = null;
@@ -290,8 +314,14 @@ public class ColumnSchema {
* Constructor for the required parameters.
* @param name column's name
* @param type column's type
+ * @throws IllegalArgumentException if the column's name equals the
reserved
+ * auto-incrementing column name
*/
public ColumnSchemaBuilder(String name, Type type) {
+ if (name.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) {
+ throw new IllegalArgumentException("Column name " +
+ Schema.getAutoIncrementingColumnName() + " is reserved by Kudu
engine");
+ }
this.name = name;
this.type = type;
}
@@ -304,6 +334,7 @@ public class ColumnSchema {
this.name = that.name;
this.type = that.type;
this.key = that.key;
+ this.keyUnique = that.keyUnique;
this.nullable = that.nullable;
this.immutable = that.immutable;
this.defaultValue = that.defaultValue;
@@ -317,11 +348,25 @@ public class ColumnSchema {
/**
* Sets if the column is part of the row key. False by default.
+ * This function call overrides any previous key() and nonUniqueKey() call.
* @param key a boolean that indicates if the column is part of the key
* @return this instance
*/
public ColumnSchemaBuilder key(boolean key) {
this.key = key;
+ this.keyUnique = key ? true : false;
+ return this;
+ }
+
+ /**
+ * Sets if the column is part of the row non unique key. False by default.
+ * This function call overrides any previous key() and nonUniqueKey() call.
+ * @param key a boolean that indicates if the column is a part of the non
unique key
+ * @return this instance
+ */
+ public ColumnSchemaBuilder nonUniqueKey(boolean key) {
+ this.key = key;
+ this.keyUnique = false;
return this;
}
@@ -456,10 +501,97 @@ public class ColumnSchema {
}
}
- return new ColumnSchema(name, type,
- key, nullable, immutable, defaultValue,
+ return new ColumnSchema(name, type, key, keyUnique, nullable, immutable,
+ /* autoIncrementing */false, defaultValue,
desiredBlockSize, encoding, compressionAlgorithm,
typeAttributes, wireType, comment);
}
}
+
+ /**
+ * Builder for ColumnSchema of the auto-incrementing column. It's used
internally in Kudu
+ * client library.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static class AutoIncrementingColumnSchemaBuilder {
+ private final String name;
+ private final Type type;
+ private int desiredBlockSize = 0;
+ private Encoding encoding = null;
+ private CompressionAlgorithm compressionAlgorithm = null;
+ private Common.DataType wireType = null;
+ private String comment = "";
+
+ /**
+ * Constructor with default parameter values for {@link ColumnSchema}.
+ */
+ public AutoIncrementingColumnSchemaBuilder() {
+ this.name = Schema.getAutoIncrementingColumnName();
+ this.type = Schema.getAutoIncrementingColumnType();
+ }
+
+ /**
+ * Set the desired block size for this column.
+ */
+ public AutoIncrementingColumnSchemaBuilder desiredBlockSize(int
desiredBlockSize) {
+ this.desiredBlockSize = desiredBlockSize;
+ return this;
+ }
+
+ /**
+ * Set the block encoding for this column. This function should be called
when
+ * fetching column schema from Kudu server.
+ */
+ public AutoIncrementingColumnSchemaBuilder encoding(Encoding encoding) {
+ this.encoding = encoding;
+ return this;
+ }
+
+ /**
+ * Set the compression algorithm for this column. This function should be
called
+ * when fetching column schema from Kudu server.
+ */
+ public AutoIncrementingColumnSchemaBuilder compressionAlgorithm(
+ CompressionAlgorithm compressionAlgorithm) {
+ this.compressionAlgorithm = compressionAlgorithm;
+ return this;
+ }
+
+ /**
+ * Allows an alternate {@link Common.DataType} to override the {@link Type}
+ * when serializing the ColumnSchema on the wire.
+ * This is useful for virtual columns specified by their type such as
+ * {@link Common.DataType#IS_DELETED}.
+ */
+ @InterfaceAudience.Private
+ public AutoIncrementingColumnSchemaBuilder wireType(Common.DataType
wireType) {
+ this.wireType = wireType;
+ return this;
+ }
+
+ /**
+ * Set the comment for this column.
+ */
+ public AutoIncrementingColumnSchemaBuilder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ /**
+ * Builds a {@link ColumnSchema} for auto-incrementing column with passed
parameters.
+ * @return a new {@link ColumnSchema}
+ */
+ public ColumnSchema build() {
+ // Set the wire type if it wasn't explicitly set.
+ if (wireType == null) {
+ this.wireType = type.getDataType(null);
+ }
+ return new ColumnSchema(name, type, /* key */true, /* keyUnique */false,
+ /* nullable */false, /* immutable */false,
+ /* autoIncrementing */true, /* defaultValue
*/null,
+ desiredBlockSize, encoding, compressionAlgorithm,
+ /* typeAttributes */null, wireType, comment);
+ }
+ }
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
index f07d0c616..5ff514bf1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
@@ -39,6 +39,13 @@ import org.apache.kudu.client.PartialRow;
@InterfaceStability.Evolving
public class Schema {
+ /*
+ * Column name and type of auto_incrementing_id column, which is added by
Kudu engine
+ * automatically if the primary key is not unique.
+ */
+ private static final String AUTO_INCREMENTING_ID_COL_NAME =
"auto_incrementing_id";
+ private static final Type AUTO_INCREMENTING_ID_COL_TYPE = Type.INT64;
+
/**
* Mapping of column index to column.
*/
@@ -71,8 +78,10 @@ public class Schema {
private final int varLengthColumnCount;
private final int rowSize;
+ private final boolean isKeyUnique;
private final boolean hasNullableColumns;
private final boolean hasImmutableColumns;
+ private final boolean hasAutoIncrementingColumn;
private final int isDeletedIndex;
private static final int NO_IS_DELETED_INDEX = -1;
@@ -106,6 +115,54 @@ public class Schema {
"Schema must be constructed with all column IDs, or none.");
}
+ boolean isKeyFound = false;
+ boolean isKeyUnique = false;
+ boolean hasAutoIncrementing = false;
+ int keyColumnCount = 0;
+ int maxColumnId = Integer.MIN_VALUE;
+ // Check if auto-incrementing column should be added into the input
columns list
+ for (int index = 0; index < columns.size(); index++) {
+ final ColumnSchema column = columns.get(index);
+ if (column.isKey()) {
+ keyColumnCount++;
+ if (!isKeyFound) {
+ isKeyFound = true;
+ isKeyUnique = column.isKeyUnique();
+ } else if (isKeyUnique != column.isKeyUnique()) {
+ throw new IllegalArgumentException(
+ "Mixture of unique key and non unique key in a table");
+ }
+ }
+ if (column.isAutoIncrementing()) {
+ if (!hasAutoIncrementing) {
+ hasAutoIncrementing = true;
+ } else {
+ throw new IllegalArgumentException(
+ "More than one columns are set as auto-incrementing columns");
+ }
+ }
+ if (hasColumnIds && maxColumnId < columnIds.get(index).intValue()) {
+ maxColumnId = columnIds.get(index).intValue();
+ }
+ }
+ // Add auto-incrementing column into input columns list if the primary key
is not
+ // unique and auto-incrementing column has not been created.
+ if (keyColumnCount > 0 && !isKeyUnique && !hasAutoIncrementing) {
+ // Build auto-incrementing column
+ ColumnSchema autoIncrementingColumn =
+ new ColumnSchema.AutoIncrementingColumnSchemaBuilder().build();
+ // Make a copy of mutable list of columns, then add an auto-incrementing
+ // column after the columns marked as key columns.
+ columns = new ArrayList<>(columns);
+ Preconditions.checkNotNull(columns);
+ columns.add(keyColumnCount, autoIncrementingColumn);
+ if (hasColumnIds) {
+ columnIds = new ArrayList<>(columnIds);
+ columnIds.add(keyColumnCount, maxColumnId + 1);
+ }
+ hasAutoIncrementing = true;
+ }
+
this.columnsByIndex = ImmutableList.copyOf(columns);
int varLenCnt = 0;
this.columnOffsets = new int[columns.size()];
@@ -154,8 +211,10 @@ public class Schema {
this.varLengthColumnCount = varLenCnt;
this.rowSize = getRowSize(this.columnsByIndex);
+ this.isKeyUnique = isKeyUnique;
this.hasNullableColumns = hasNulls;
this.hasImmutableColumns = hasImmutables;
+ this.hasAutoIncrementingColumn = hasAutoIncrementing;
this.isDeletedIndex = isDeletedIndex;
}
@@ -294,6 +353,38 @@ public class Schema {
return primaryKeyColumns;
}
+ /**
+ * Answers if the primary key is unique for the table
+ * @return true if the key is unique
+ */
+ public boolean isPrimaryKeyUnique() {
+ return this.isKeyUnique;
+ }
+
+ /**
+ * Tells if there's auto-incrementing column
+ * @return true if there's auto-incrementing column, else false.
+ */
+ public boolean hasAutoIncrementingColumn() {
+ return this.hasAutoIncrementingColumn;
+ }
+
+ /**
+ * Get the name of the auto-incrementing column
+ * @return column name of the auto-incrementing column.
+ */
+ public static String getAutoIncrementingColumnName() {
+ return AUTO_INCREMENTING_ID_COL_NAME;
+ }
+
+ /**
+ * Get the type of the auto-incrementing column
+ * @return type of the auto-incrementing column.
+ */
+ public static Type getAutoIncrementingColumnType() {
+ return AUTO_INCREMENTING_ID_COL_TYPE;
+ }
+
/**
* Get a schema that only contains the columns which are part of the key
* @return new schema with only the keys
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
index 026c3f6ce..f3d7e4637 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
@@ -34,6 +34,7 @@ import org.apache.yetus.audience.InterfaceStability;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Common;
+import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
import org.apache.kudu.master.Master;
@@ -84,6 +85,10 @@ public class AlterTableOptions {
* @return this instance
*/
public AlterTableOptions addColumn(ColumnSchema colSchema) {
+ if
(colSchema.getName().equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) {
+ throw new IllegalArgumentException("Column name " +
+ Schema.getAutoIncrementingColumnName() + " is reserved by Kudu
engine");
+ }
if (!colSchema.isNullable() && colSchema.getDefaultValue() == null) {
throw new IllegalArgumentException("A new non-null column must have a
default value");
}
@@ -140,6 +145,10 @@ public class AlterTableOptions {
* @return this instance
*/
public AlterTableOptions dropColumn(String name) {
+ if (name.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) {
+ throw new IllegalArgumentException("Cannot remove auto-incrementing
column " +
+ Schema.getAutoIncrementingColumnName());
+ }
AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
step.setType(AlterTableRequestPB.StepType.DROP_COLUMN);
step.setDropColumn(AlterTableRequestPB.DropColumn.newBuilder().setName(name));
@@ -153,6 +162,10 @@ public class AlterTableOptions {
* @return this instance
*/
public AlterTableOptions renameColumn(String oldName, String newName) {
+ if (oldName.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) {
+ throw new IllegalArgumentException("Cannot rename auto-incrementing
column " +
+ Schema.getAutoIncrementingColumnName());
+ }
// For backwards compatibility, this uses the RENAME_COLUMN step type.
AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
step.setType(AlterTableRequestPB.StepType.RENAME_COLUMN);
@@ -167,6 +180,10 @@ public class AlterTableOptions {
* @return this instance
*/
public AlterTableOptions removeDefault(String name) {
+ if (name.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) {
+ throw new IllegalArgumentException("Auto-incrementing column " +
+ Schema.getAutoIncrementingColumnName() + " does not have default
value");
+ }
AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
step.setType(AlterTableRequestPB.StepType.ALTER_COLUMN);
AlterTableRequestPB.AlterColumn.Builder alterBuilder =
@@ -185,6 +202,10 @@ public class AlterTableOptions {
* @return this instance
*/
public AlterTableOptions changeDefault(String name, Object newDefault) {
+ if (name.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) {
+ throw new IllegalArgumentException("Cannot set default value for " +
+ "auto-incrementing column " +
Schema.getAutoIncrementingColumnName());
+ }
if (newDefault == null) {
throw new IllegalArgumentException("newDefault cannot be null: " +
"use removeDefault to clear a default value");
@@ -486,6 +507,10 @@ public class AlterTableOptions {
* @return this instance
*/
public AlterTableOptions changeImmutable(String name, boolean immutable) {
+ if (name.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) {
+ throw new IllegalArgumentException("Cannot change immutable for " +
+ "auto-incrementing column " +
Schema.getAutoIncrementingColumnName());
+ }
AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
step.setType(AlterTableRequestPB.StepType.ALTER_COLUMN);
AlterTableRequestPB.AlterColumn.Builder alterBuilder =
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 49c188901..4929dd4d8 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -370,6 +370,8 @@ public final class AsyncKuduScanner {
columns.add(getStrippedColumnSchema(originalColumn));
}
} else {
+ // By default, a scanner is created with all columns including
auto-incrementing
+ // column if projected columns are not specified.
columns.addAll(table.getSchema().getColumns());
}
// This is a diff scan so add the IS_DELETED column.
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
index 1346231f3..97672ecc1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
@@ -185,8 +185,13 @@ public class KuduTable {
/**
* Get a new upsert configured with this table's schema. The returned object
should not be reused.
* @return an upsert with this table's schema
+ * @throws UnsupportedOperationException if the table has auto-incrementing
column
*/
public Upsert newUpsert() {
+ if (schema.hasAutoIncrementingColumn()) {
+ throw new UnsupportedOperationException(
+ "Tables with auto-incrementing column do not support UPSERT
operations");
+ }
return new Upsert(this);
}
@@ -195,8 +200,13 @@ public class KuduTable {
* updating immutable cells in a row. This is useful when upserting rows in
a table with immutable
* columns.
* @return an upsert with this table's schema
+ * @throws UnsupportedOperationException if the table has auto-incrementing
column
*/
public UpsertIgnore newUpsertIgnore() {
+ if (schema.hasAutoIncrementingColumn()) {
+ throw new UnsupportedOperationException(
+ "Tables with auto-incrementing column do not support UPSERT_IGNORE
operations");
+ }
return new UpsertIgnore(this);
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index b7e0f77c6..a9c015bb0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -108,6 +108,7 @@ public class ProtobufHelper {
.setIsKey(column.isKey())
.setIsNullable(column.isNullable())
.setImmutable(column.isImmutable())
+ .setIsAutoIncrementing(column.isAutoIncrementing())
.setCfileBlockSize(column.getDesiredBlockSize());
if (!flags.contains(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID) && colId
>= 0) {
@@ -150,26 +151,47 @@ public class ProtobufHelper {
}
public static ColumnSchema pbToColumnSchema(Common.ColumnSchemaPB pb) {
+ return pbToColumnSchema(pb, true);
+ }
+
+ public static ColumnSchema pbToColumnSchema(Common.ColumnSchemaPB pb,
+ boolean isKeyUnique) {
+ ColumnSchema.Encoding encoding =
ColumnSchema.Encoding.valueOf(pb.getEncoding().name());
+ ColumnSchema.CompressionAlgorithm compressionAlgorithm =
+ ColumnSchema.CompressionAlgorithm.valueOf(pb.getCompression().name());
+ int desiredBlockSize = pb.getCfileBlockSize();
+
+ if (pb.getIsAutoIncrementing()) {
+ // Set encoding, compression algorithm, block size and comment from 'pb'
parameter
+ return new ColumnSchema.AutoIncrementingColumnSchemaBuilder()
+ .encoding(encoding)
+ .compressionAlgorithm(compressionAlgorithm)
+ .desiredBlockSize(desiredBlockSize)
+ .comment(pb.getComment())
+ .build();
+ }
+
Type type = Type.getTypeForDataType(pb.getType());
ColumnTypeAttributes typeAttributes = pb.hasTypeAttributes() ?
pbToColumnTypeAttributes(pb.getTypeAttributes()) : null;
Object defaultValue = pb.hasWriteDefaultValue() ?
byteStringToObject(type, typeAttributes, pb.getWriteDefaultValue()) :
null;
- ColumnSchema.Encoding encoding =
ColumnSchema.Encoding.valueOf(pb.getEncoding().name());
- ColumnSchema.CompressionAlgorithm compressionAlgorithm =
- ColumnSchema.CompressionAlgorithm.valueOf(pb.getCompression().name());
- int desiredBlockSize = pb.getCfileBlockSize();
- return new ColumnSchema.ColumnSchemaBuilder(pb.getName(), type)
- .key(pb.getIsKey())
- .nullable(pb.getIsNullable())
- .immutable(pb.getImmutable())
- .defaultValue(defaultValue)
- .encoding(encoding)
- .compressionAlgorithm(compressionAlgorithm)
- .desiredBlockSize(desiredBlockSize)
- .typeAttributes(typeAttributes)
- .comment(pb.getComment())
- .build();
+ ColumnSchema.ColumnSchemaBuilder csb =
+ new ColumnSchema.ColumnSchemaBuilder(pb.getName(), type);
+ if (pb.getIsKey() && isKeyUnique) {
+ csb.key(true);
+ } else {
+ csb.nonUniqueKey(pb.getIsKey());
+ }
+ return csb.nullable(pb.getIsNullable())
+ .immutable(pb.getImmutable())
+ .defaultValue(defaultValue)
+ .encoding(encoding)
+ .compressionAlgorithm(compressionAlgorithm)
+ .desiredBlockSize(desiredBlockSize)
+ .typeAttributes(typeAttributes)
+ .comment(pb.getComment())
+ .build();
}
public static ColumnTypeAttributes
pbToColumnTypeAttributes(Common.ColumnTypeAttributesPB pb) {
@@ -188,10 +210,22 @@ public class ProtobufHelper {
}
public static Schema pbToSchema(Common.SchemaPB schema) {
+ // Since ColumnSchema.keyUnique in run-time structures is not persistent
in Kudu
+ // server, we need to find if the table has auto-incrementing column
first, and set
+ // all key columns as non unique key columns if the table has
auto-incrementing
+ // column.
+ boolean hasAutoIncrementing = false;
+ for (Common.ColumnSchemaPB columnPb : schema.getColumnsList()) {
+ if (columnPb.getIsAutoIncrementing()) {
+ hasAutoIncrementing = true;
+ break;
+ }
+ }
List<ColumnSchema> columns = new ArrayList<>(schema.getColumnsCount());
List<Integer> columnIds = new ArrayList<>(schema.getColumnsCount());
for (Common.ColumnSchemaPB columnPb : schema.getColumnsList()) {
- columns.add(pbToColumnSchema(columnPb));
+ // Key is not unique if hasAutoIncrementing is true.
+ columns.add(pbToColumnSchema(columnPb, !hasAutoIncrementing));
int id = columnPb.getId();
if (id < 0) {
throw new IllegalArgumentException("Illegal column ID: " + id);
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java
b/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java
index a3385a04e..c1fd3c9e5 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java
@@ -25,6 +25,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
+import org.apache.kudu.ColumnSchema.AutoIncrementingColumnSchemaBuilder;
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
import org.apache.kudu.test.junit.RetryRule;
import org.apache.kudu.util.CharUtil;
@@ -69,8 +70,17 @@ public class TestColumnSchema {
ColumnSchema isKey = new ColumnSchemaBuilder("col1", Type.STRING)
.key(true)
.build();
+ Assert.assertTrue(isKey.isKey());
assertNotEquals(stringCol1, isKey);
+ // Difference between key and nonUniqueKey
+ ColumnSchema isNonUniqueKey = new ColumnSchemaBuilder("col1", Type.STRING)
+ .nonUniqueKey(true)
+ .build();
+ Assert.assertTrue(isNonUniqueKey.isKey());
+ Assert.assertFalse(isNonUniqueKey.isKeyUnique());
+ assertNotEquals(isKey, isNonUniqueKey);
+
// Different by type
ColumnSchema isInt = new ColumnSchemaBuilder("col1", Type.INT32)
.build();
@@ -126,4 +136,27 @@ public class TestColumnSchema {
.contains("VARCHAR's length must be set and between 1 and 65535"));
}
+ @Test
+ public void testAutoIncrementing() throws Exception {
+ // Create auto-incrementing column with AutoIncrementingColumnSchemaBuilder
+ ColumnSchema autoIncrementing = new
AutoIncrementingColumnSchemaBuilder().build();
+ Assert.assertTrue(autoIncrementing.isAutoIncrementing());
+ assertEquals(Schema.getAutoIncrementingColumnType(),
autoIncrementing.getType());
+ Assert.assertTrue(autoIncrementing.isKey());
+ Assert.assertFalse(autoIncrementing.isKeyUnique());
+ Assert.assertFalse(autoIncrementing.isNullable());
+ Assert.assertFalse(autoIncrementing.isImmutable());
+ assertEquals(null, autoIncrementing.getDefaultValue());
+
+ // Create column with auto-incrementing column name with
ColumnSchemaBuilder
+ Throwable thrown = Assert.assertThrows(IllegalArgumentException.class, new
ThrowingRunnable() {
+ @Override
+ public void run() throws Exception {
+ new ColumnSchemaBuilder(Schema.getAutoIncrementingColumnName(),
+ Schema.getAutoIncrementingColumnType()).build();
+ }
+ });
+ Assert.assertTrue(thrown.getMessage().contains("Column name " +
+ Schema.getAutoIncrementingColumnName() + " is reserved by Kudu
engine"));
+ }
}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 01965daff..917d5aed2 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -29,6 +29,7 @@ import static
org.apache.kudu.test.ClientTestUtil.createManyVarcharsSchema;
import static
org.apache.kudu.test.ClientTestUtil.createSchemaWithBinaryColumns;
import static org.apache.kudu.test.ClientTestUtil.createSchemaWithDateColumns;
import static
org.apache.kudu.test.ClientTestUtil.createSchemaWithDecimalColumns;
+import static org.apache.kudu.test.ClientTestUtil.createSchemaWithNonUniqueKey;
import static
org.apache.kudu.test.ClientTestUtil.createSchemaWithTimestampColumns;
import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
import static
org.apache.kudu.test.ClientTestUtil.getBasicTableOptionsWithNonCoveredRange;
@@ -64,6 +65,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.stumbleupon.async.Deferred;
import org.junit.Before;
import org.junit.Rule;
@@ -782,6 +784,223 @@ public class TestKuduClient {
}
}
+ /**
+ * Test creating a table with non unique primary key in the table schema.
+ */
+ @Test(timeout = 100000)
+ public void testCreateTableWithNonUniquePrimaryKeys() throws Exception {
+ // Create a schema with non unique primary key column
+ Schema schema = createSchemaWithNonUniqueKey();
+ assertFalse(schema.isPrimaryKeyUnique());
+ // Verify auto-incrementing column is in the schema
+ assertTrue(schema.hasAutoIncrementingColumn());
+ assertEquals(3, schema.getColumnCount());
+ assertEquals(2, schema.getPrimaryKeyColumnCount());
+ assertEquals(1,
schema.getColumnIndex(Schema.getAutoIncrementingColumnName()));
+ // Create a table
+ client.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
+
+ KuduSession session = client.newSession();
+ KuduTable table = client.openTable(TABLE_NAME);
+
+ // Verify that the primary key is not unique, and an auto-incrementing
column is
+ // added as key column in the position after all key columns.
+ schema = table.getSchema();
+ assertFalse(schema.isPrimaryKeyUnique());
+ assertTrue(schema.hasAutoIncrementingColumn());
+ assertEquals(3, schema.getColumnCount());
+ assertEquals(2, schema.getPrimaryKeyColumnCount());
+ assertEquals(1,
schema.getColumnIndex(Schema.getAutoIncrementingColumnName()));
+
assertTrue(schema.getColumn(Schema.getAutoIncrementingColumnName()).isKey());
+ assertTrue(schema.getColumn(
+ Schema.getAutoIncrementingColumnName()).isAutoIncrementing());
+
+ // Insert rows into the table without assigning values for the
auto-incrementing
+ // column.
+ for (int i = 0; i < 3; i++) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addInt("key", i);
+ row.addInt("c1", i * 10);
+ session.apply(insert);
+ }
+ session.flush();
+
+ // Scan all the rows in the table with all columns.
+ // Verify that the auto-incrementing column is included in the rows.
+ List<String> rowStrings = scanTableToStrings(table);
+ assertEquals(3, rowStrings.size());
+ for (int i = 0; i < rowStrings.size(); i++) {
+ StringBuilder expectedRow = new StringBuilder();
+ expectedRow.append(String.format("INT32 key=%d, INT64 %s=%d, INT32
c1=%d",
+ i, Schema.getAutoIncrementingColumnName(), i + 1, i * 10));
+ assertEquals(expectedRow.toString(), rowStrings.get(i));
+ }
+
+ // Update "c1" column of the first row with "key" and auto-incrementing
columns.
+ Update update = table.newUpdate();
+ PartialRow row = update.getRow();
+ row.addInt(schema.getColumnByIndex(0).getName(), 0);
+ row.addLong(schema.getColumnByIndex(1).getName(), 1);
+ row.addInt(schema.getColumnByIndex(2).getName(), 100);
+ session.apply(update);
+ session.flush();
+
+ // Scan all the rows in the table without the auto-incrementing column.
+ // Verify that "c1" column of the first row is updated.
+ KuduScanner.KuduScannerBuilder scanBuilder =
client.newScannerBuilder(table);
+ KuduScanner scanner =
+ scanBuilder.setProjectedColumnNames(Lists.newArrayList("key",
"c1")).build();
+ rowStrings.clear();
+ for (RowResult r : scanner) {
+ rowStrings.add(r.rowToString());
+ }
+ Collections.sort(rowStrings);
+ assertEquals(3, rowStrings.size());
+ for (int i = 0; i < rowStrings.size(); i++) {
+ StringBuilder expectedRow = new StringBuilder();
+ if (i == 0) {
+ expectedRow.append(String.format("INT32 key=0, INT32 c1=100"));
+ } else {
+ expectedRow.append(String.format("INT32 key=%d, INT32 c1=%d", i, i *
10));
+ }
+ assertEquals(expectedRow.toString(), rowStrings.get(i));
+ }
+
+ // Delete the first row with "key" and auto-incrementing columns.
+ // Verify that number of rows is decreased by 1.
+ Delete delete = table.newDelete();
+ row = delete.getRow();
+ row.addInt(schema.getColumnByIndex(0).getName(), 0);
+ row.addLong(schema.getColumnByIndex(1).getName(), 1);
+ session.apply(delete);
+ session.flush();
+ assertEquals(2, countRowsInScan(client.newScannerBuilder(table).build()));
+
+ // Check that we can delete the table.
+ client.deleteTable(TABLE_NAME);
+ }
+
+ /**
+ * Test operations for table with auto-incrementing column.
+ */
+ @Test(timeout = 100000)
+ public void testTableWithAutoIncrementingColumn() throws Exception {
+ // Create a schema with non unique primary key column
+ Schema schema = createSchemaWithNonUniqueKey();
+ assertFalse(schema.isPrimaryKeyUnique());
+ // Verify auto-incrementing column is in the schema
+ assertTrue(schema.hasAutoIncrementingColumn());
+ assertEquals(3, schema.getColumnCount());
+ assertEquals(2, schema.getPrimaryKeyColumnCount());
+ // Create a table
+ client.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
+
+ final KuduSession session = client.newSession();
+ KuduTable table = client.openTable(TABLE_NAME);
+ schema = table.getSchema();
+ assertTrue(schema.hasAutoIncrementingColumn());
+
+ // Verify that UPSERT is not allowed for table with auto-incrementing
column
+ try {
+ table.newUpsert();
+ fail("UPSERT on table with auto-incrementing column");
+ } catch (UnsupportedOperationException e) {
+ assertTrue(e.getMessage().contains(
+ "Tables with auto-incrementing column do not support UPSERT
operations"));
+ }
+
+ // Verify that UPSERT_IGNORE is not allowed for table with
auto-incrementing column
+ try {
+ table.newUpsertIgnore();
+ fail("UPSERT_IGNORE on table with auto-incrementing column");
+ } catch (UnsupportedOperationException e) {
+ assertTrue(e.getMessage().contains(
+ "Tables with auto-incrementing column do not support UPSERT_IGNORE
operations"));
+ }
+
+ // Change desired block size for auto-incrementing column
+ client.alterTable(TABLE_NAME, new
AlterTableOptions().changeDesiredBlockSize(
+ Schema.getAutoIncrementingColumnName(), 1));
+ // Change encoding for auto-incrementing column
+ client.alterTable(TABLE_NAME, new AlterTableOptions().changeEncoding(
+ Schema.getAutoIncrementingColumnName(),
ColumnSchema.Encoding.PLAIN_ENCODING));
+ // Change compression algorithm for auto-incrementing column
+ client.alterTable(TABLE_NAME, new
AlterTableOptions().changeCompressionAlgorithm(
+ Schema.getAutoIncrementingColumnName(),
ColumnSchema.CompressionAlgorithm.NO_COMPRESSION));
+ session.flush();
+
+ // Verify that auto-incrementing column cannot be added
+ try {
+ client.alterTable(TABLE_NAME, new AlterTableOptions().addColumn(
+ Schema.getAutoIncrementingColumnName(),
Schema.getAutoIncrementingColumnType(), 0));
+ fail("Add auto-incrementing column");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Column name " +
+ Schema.getAutoIncrementingColumnName() + " is reserved by Kudu
engine"));
+ }
+ try {
+ client.alterTable(TABLE_NAME, new AlterTableOptions().addColumn(
+ new ColumnSchema.AutoIncrementingColumnSchemaBuilder().build()));
+ fail("Add auto-incrementing column");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Column name " +
+ Schema.getAutoIncrementingColumnName() + " is reserved by Kudu
engine"));
+ }
+
+ // Verify that auto-incrementing column cannot be removed
+ try {
+ client.alterTable(TABLE_NAME, new AlterTableOptions().dropColumn(
+ Schema.getAutoIncrementingColumnName()));
+ fail("Drop auto-incrementing column");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Cannot remove auto-incrementing
column " +
+ Schema.getAutoIncrementingColumnName()));
+ }
+
+ // Verify that auto-incrementing column cannot be renamed
+ try {
+ client.alterTable(TABLE_NAME, new AlterTableOptions().renameColumn(
+ Schema.getAutoIncrementingColumnName(), "new_auto_incrementing"));
+ fail("Rename auto-incrementing column");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Cannot rename auto-incrementing
column " +
+ Schema.getAutoIncrementingColumnName()));
+ }
+
+ // Verify that auto-incrementing column cannot be changed by removing
default
+ try {
+ client.alterTable(TABLE_NAME, new AlterTableOptions().removeDefault(
+ Schema.getAutoIncrementingColumnName()));
+ fail("Remove default value for auto-incrementing column");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Auto-incrementing column " +
+ Schema.getAutoIncrementingColumnName() + " does not have default
value"));
+ }
+
+ // Verify that auto-incrementing column cannot be changed with default
value
+ try {
+ client.alterTable(TABLE_NAME, new AlterTableOptions().changeDefault(
+ Schema.getAutoIncrementingColumnName(), 0));
+ fail("Change default value for auto-incrementing column");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Cannot set default value for " +
+ "auto-incrementing column " +
Schema.getAutoIncrementingColumnName()));
+ }
+
+ // Verify that auto-incrementing column cannot be changed for its immutable
+ try {
+ client.alterTable(TABLE_NAME, new AlterTableOptions().changeImmutable(
+ Schema.getAutoIncrementingColumnName(), true));
+ fail("Change immutable for auto-incrementing column");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Cannot change immutable for " +
+ "auto-incrementing column " +
Schema.getAutoIncrementingColumnName()));
+ }
+
+ client.deleteTable(TABLE_NAME);
+ }
+
/**
* Test inserting and retrieving rows from a table that has a range partition
* with custom hash schema.
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
index 72fb70c90..6f5469241 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
@@ -2560,4 +2560,106 @@ public class TestKuduTable {
".* server sent error unsupported feature flags"));
}
}
+
+ /**
+ * Test creating table schemas with non unique primary key columns and
+ * auto-incrementing columns.
+ */
+ @Test(timeout = 100000)
+ public void testCreateSchemaWithNonUniquePrimaryKeys() throws Exception {
+ // Create a schema with two non unique primary key columns and
+ // verify the resulting table's schema.
+ ArrayList<ColumnSchema> columns = new ArrayList<>();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32)
+ .nonUniqueKey(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key2", Type.INT64)
+ .nonUniqueKey(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32)
+ .nullable(true).build());
+ Schema schema = new Schema(columns);
+ assertFalse(schema.isPrimaryKeyUnique());
+ assertTrue(schema.hasAutoIncrementingColumn());
+ assertEquals(4, schema.getColumnCount());
+ assertEquals(3, schema.getPrimaryKeyColumnCount());
+ client.createTable(tableName, schema, getBasicCreateTableOptions());
+ KuduTable table = client.openTable(tableName);
+ schema = table.getSchema();
+ assertFalse(schema.isPrimaryKeyUnique());
+ assertTrue(schema.hasAutoIncrementingColumn());
+ assertEquals(4, schema.getColumnCount());
+ assertEquals(3, schema.getPrimaryKeyColumnCount());
+ client.deleteTable(tableName);
+
+ // Create a schema with non unique primary key column and unique primary
key column
+ columns.clear();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32)
+ .nonUniqueKey(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key2", Type.INT32)
+ .key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32)
+ .nullable(true).build());
+ try {
+ new Schema(columns);
+ fail("Schema with mixture of unique key and non unique key");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains(
+ "Mixture of unique key and non unique key in a table"));
+ }
+
+ // Create a schema with an auto-incrementing column which is marked as non
unique
+ // primary key and verify the resulting table's schema.
+ columns.clear();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32)
+ .nonUniqueKey(true).build());
+ columns.add(new
ColumnSchema.AutoIncrementingColumnSchemaBuilder().build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32)
+ .nullable(true).build());
+ schema = new Schema(columns);
+ assertTrue(schema.hasAutoIncrementingColumn());
+ assertFalse(schema.isPrimaryKeyUnique());
+ assertEquals(3, schema.getColumnCount());
+ assertEquals(2, schema.getPrimaryKeyColumnCount());
+ client.createTable(tableName, schema, getBasicCreateTableOptions());
+ table = client.openTable(tableName);
+ schema = table.getSchema();
+ assertTrue(schema.hasAutoIncrementingColumn());
+ assertFalse(schema.isPrimaryKeyUnique());
+ assertEquals(3, schema.getColumnCount());
+ assertEquals(2, schema.getPrimaryKeyColumnCount());
+ client.deleteTable(tableName);
+
+ // Create a schema with a single auto-incrementing column which is marked
as non
+ // unique primary key, and verify the resulting table's schema.
+ columns.clear();
+ columns.add(new
ColumnSchema.AutoIncrementingColumnSchemaBuilder().build());
+ schema = new Schema(columns);
+ assertTrue(schema.hasAutoIncrementingColumn());
+ assertFalse(schema.isPrimaryKeyUnique());
+ assertEquals(1, schema.getColumnCount());
+ assertEquals(1, schema.getPrimaryKeyColumnCount());
+ CreateTableOptions builder = new CreateTableOptions();
+
builder.setRangePartitionColumns(ImmutableList.of(Schema.getAutoIncrementingColumnName()));
+ client.createTable(tableName, schema, builder);
+ table = client.openTable(tableName);
+ schema = table.getSchema();
+ assertTrue(schema.hasAutoIncrementingColumn());
+ assertFalse(schema.isPrimaryKeyUnique());
+ assertEquals(1, schema.getColumnCount());
+ assertEquals(1, schema.getPrimaryKeyColumnCount());
+ client.deleteTable(tableName);
+
+ // Create a schema with two auto-incrementing columns
+ columns.clear();
+ columns.add(new
ColumnSchema.AutoIncrementingColumnSchemaBuilder().build());
+ columns.add(new
ColumnSchema.AutoIncrementingColumnSchemaBuilder().build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32)
+ .nullable(true).build());
+ try {
+ new Schema(columns);
+ fail("Schema with two auto-incrementing columns");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains(
+ "More than one columns are set as auto-incrementing columns"));
+ }
+ }
}
diff --git
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
index 8cb7d28b8..72b7a2690 100644
---
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
+++
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
@@ -497,4 +497,13 @@ public abstract class ClientTestUtil {
.nullable(true).immutable(true).build());
return new Schema(columns);
}
+
+ public static Schema createSchemaWithNonUniqueKey() {
+ ArrayList<ColumnSchema> columns = new ArrayList<>();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key",
Type.INT32).nonUniqueKey(true)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c1",
Type.INT32).nullable(true)
+ .build());
+ return new Schema(columns);
+ }
}