This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit b7cb29ee55fce92ea804002867ba3e7c68bc1129
Author: Xinyi Yan <[email protected]>
AuthorDate: Tue Sep 29 16:11:41 2020 -0700

    PHOENIX-6167 Adding maxMutationCellSizeBytes config and exception
    
    Signed-off-by: Xinyi Yan <[email protected]>
---
 .../apache/phoenix/end2end/MutationStateIT.java    | 74 ++++++++++++++++++++++
 .../org/apache/phoenix/compile/UpsertCompiler.java | 47 ++++++++++++--
 .../apache/phoenix/exception/SQLExceptionCode.java | 11 ++++
 .../apache/phoenix/exception/SQLExceptionInfo.java | 29 +++++++++
 .../org/apache/phoenix/query/QueryServices.java    |  1 +
 .../apache/phoenix/query/QueryServicesOptions.java |  1 +
 .../MaxPhoenixColumnSizeExceededException.java     | 46 ++++++++++++++
 7 files changed, 203 insertions(+), 6 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
index f6a9993..acaa56a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.end2end;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -550,4 +551,77 @@ public class MutationStateIT extends 
ParallelStatsDisabledIT {
             assertNotNull(PhoenixRuntime.getTableNoCache(conn, tableName));
         }
     }
+
+    @Test
+    public void testUpsertMaxColumnAllowanceForSingleCellArrayWithOffsets() 
throws Exception {
+        
testUpsertColumnExceedsMaxAllowanceSize("SINGLE_CELL_ARRAY_WITH_OFFSETS");
+    }
+
+    @Test
+    public void testUpsertMaxColumnAllowanceForOneCellPerColumn() throws 
Exception {
+        testUpsertColumnExceedsMaxAllowanceSize("ONE_CELL_PER_COLUMN");
+    }
+
+    public void testUpsertColumnExceedsMaxAllowanceSize(String storageScheme) 
throws Exception {
+        Properties connectionProperties = new Properties();
+        
connectionProperties.setProperty(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE, 
"20");
+        try (PhoenixConnection connection =
+                (PhoenixConnection) DriverManager.getConnection(getUrl(), 
connectionProperties)) {
+            String fullTableName = generateUniqueName();
+            String pk1Name = generateUniqueName();
+            String pk2Name = generateUniqueName();
+            String ddl = "CREATE IMMUTABLE TABLE " + fullTableName +
+                    " (" +  pk1Name + " VARCHAR(15) NOT NULL, " + pk2Name + " 
VARCHAR(15) NOT NULL, " +
+                    "PAYLOAD1 VARCHAR, PAYLOAD2 VARCHAR,PAYLOAD3 VARCHAR " +
+                    "CONSTRAINT PK PRIMARY KEY (" + pk1Name + "," + pk2Name+ 
")) " +
+                    "IMMUTABLE_STORAGE_SCHEME =" + storageScheme;
+            try (Statement stmt = connection.createStatement()) {
+                stmt.execute(ddl);
+            }
+            String sql = "UPSERT INTO " + fullTableName +
+                    " ("+ pk1Name + ","+ pk2Name + 
",PAYLOAD1,PAYLOAD2,PAYLOAD2) VALUES (?,?,?,?,?)";
+            String pk1Value = generateUniqueName();
+            String pk2Value = generateUniqueName();
+            String payload1Value = generateUniqueName();
+            String payload3Value = generateUniqueName();
+
+            try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+                preparedStatement.setString(1, pk1Value);
+                preparedStatement.setString(2, pk2Value);
+                preparedStatement.setString(3, payload1Value);
+                preparedStatement.setString(4, "1234567890");
+                preparedStatement.setString(5, payload3Value);
+                preparedStatement.execute();
+
+                try {
+                    preparedStatement.setString(1, pk1Value);
+                    preparedStatement.setString(2, pk2Value);
+                    preparedStatement.setString(3, payload1Value);
+                    preparedStatement.setString(4, "12345678901234567890");
+                    preparedStatement.setString(5, payload3Value);
+                    preparedStatement.execute();
+                    if (storageScheme.equals("ONE_CELL_PER_COLUMN")) {
+                        fail();
+                    }
+                } catch (SQLException e) {
+                    if (!storageScheme.equals("ONE_CELL_PER_COLUMN")) {
+                        fail();
+                    } else {
+                        
assertEquals(SQLExceptionCode.MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED.getErrorCode(),
+                                e.getErrorCode());
+                        assertTrue(e.getMessage().contains(
+                                
SQLExceptionCode.MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED.getMessage()));
+                        assertTrue(e.getMessage().contains(
+                                
connectionProperties.getProperty(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE)));
+                        assertTrue(e.getMessage().contains(pk1Name));
+                        assertTrue(e.getMessage().contains(pk2Name));
+                        assertTrue(e.getMessage().contains(pk1Value));
+                        assertTrue(e.getMessage().contains(pk2Value));
+                        assertFalse(e.getMessage().contains(payload1Value));
+                        assertFalse(e.getMessage().contains(payload3Value));
+                    }
+                }
+            }
+        }
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index ce28e2c..83c575f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -84,6 +84,7 @@ import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.DelegateColumn;
 import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.MaxPhoenixColumnSizeExceededException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
@@ -120,9 +121,9 @@ import com.google.common.collect.Sets;
 public class UpsertCompiler {
 
     private static void setValues(byte[][] values, int[] pkSlotIndex, int[] 
columnIndexes,
-            PTable table, MultiRowMutationState mutation,
-            PhoenixStatement statement, boolean useServerTimestamp, 
IndexMaintainer maintainer,
-            byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) 
throws SQLException {
+            PTable table, MultiRowMutationState mutation, PhoenixStatement 
statement, boolean useServerTimestamp,
+            IndexMaintainer maintainer, byte[][] viewConstants, byte[] 
onDupKeyBytes, int numSplColumns,
+            int maxHBaseClientKeyValueSize) throws SQLException {
         long columnValueSize = 0;
         Map<PColumn,byte[]> columnValues = 
Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
@@ -139,6 +140,13 @@ public class UpsertCompiler {
         for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
             byte[] value = values[j];
             PColumn column = table.getColumns().get(columnIndexes[i]);
+            if (value.length >= maxHBaseClientKeyValueSize &&
+                    table.getImmutableStorageScheme() == 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+                String rowkeyAndColumnInfo = 
getExceedMaxHBaseClientKeyValueAllowanceRowkeyAndColumnInfo(
+                        values, columnIndexes, table, numSplColumns, 
column.getName().getString());
+                throw new 
MaxPhoenixColumnSizeExceededException(rowkeyAndColumnInfo, 
maxHBaseClientKeyValueSize, value.length);
+            }
+
             if (SchemaUtil.isPKColumn(column)) {
                 pkValues[pkSlotIndex[i]] = value;
                 if (SchemaUtil.getPKPosition(table, column) == 
table.getRowTimestampColPos()) {
@@ -174,7 +182,27 @@ public class UpsertCompiler {
         } 
         mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, 
statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, 
onDupKeyBytes));
     }
-    
+
+    public static String 
getExceedMaxHBaseClientKeyValueAllowanceRowkeyAndColumnInfo(
+            byte[][] values, int[] columnIndexes, PTable table, int 
numSplColumns, String columnName) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
+            byte[] value = values[j];
+            PColumn column = table.getColumns().get(columnIndexes[i]);
+            if (SchemaUtil.isPKColumn(column)) {
+                if (sb.length() != 0) {
+                    sb.append(" AND ");
+                }
+                sb.append(column.getName().toString() + "=" + 
Bytes.toString(value));
+            }
+        }
+        return String.format("Upsert data to table %s on Column %s exceed max 
HBase client keyvalue size allowance, " +
+                        "the rowkey is %s",
+                SchemaUtil.getTableName(table.getSchemaName().toString(), 
table.getTableName().toString()),
+                columnName,
+                sb.toString());
+    }
+
     public static MutationState upsertSelect(StatementContext childContext, 
TableRef tableRef,
             RowProjector projector, ResultIterator iterator, int[] 
columnIndexes,
             int[] pkSlotIndexes, boolean useServerTimestamp,
@@ -187,6 +215,9 @@ public class UpsertCompiler {
         int maxSizeBytes =
                 
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,
                     QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
+        int maxHBaseClientKeyValueSize =
+                
services.getProps().getInt(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE,
+                        
QueryServicesOptions.DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE);
         int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
         // we automatically flush the mutations when either auto commit is 
enabled, or
         // the target table is transactional (in that case changes are not 
visible until we commit)
@@ -253,7 +284,7 @@ public class UpsertCompiler {
                 }
                 setValues(values, pkSlotIndexes, columnIndexes, table, 
mutation, statement,
                         useServerTimestamp, indexMaintainer, viewConstants, 
null,
-                        numSplColumns);
+                        numSplColumns, maxHBaseClientKeyValueSize);
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our 
batch size
                 if (autoFlush && rowCount % batchSize == 0) {
@@ -1267,7 +1298,11 @@ public class UpsertCompiler {
                 indexMaintainer = table.getIndexMaintainer(parentTable, 
connection);
                 viewConstants = IndexUtil.getViewConstants(parentTable);
             }
-            setValues(values, pkSlotIndexes, columnIndexes, table, mutation, 
statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 
0);
+            int maxHBaseClientKeyValueSize = 
statement.getConnection().getQueryServices().getProps().
+                    getInt(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE,
+                            
QueryServicesOptions.DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE);
+            setValues(values, pkSlotIndexes, columnIndexes, table, mutation, 
statement, useServerTimestamp,
+                    indexMaintainer, viewConstants, onDupKeyBytes, 0, 
maxHBaseClientKeyValueSize);
             return new MutationState(tableRef, mutation, 0, maxSize, 
maxSizeBytes, connection);
         }
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 3d48856..f50f5cd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -35,6 +35,7 @@ import 
org.apache.phoenix.schema.ConcurrentTableMutationException;
 import org.apache.phoenix.schema.FunctionAlreadyExistsException;
 import org.apache.phoenix.schema.FunctionNotFoundException;
 import org.apache.phoenix.schema.IndexNotFoundException;
+import org.apache.phoenix.schema.MaxPhoenixColumnSizeExceededException;
 import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException;
 import org.apache.phoenix.schema.MaxMutationSizeExceededException;
 import org.apache.phoenix.schema.PTable;
@@ -516,6 +517,16 @@ public enum SQLExceptionCode {
     NEW_INTERNAL_CONNECTION_THROTTLED(731, "410M1", "Could not create 
connection " +
             "because the internal connections already has the maximum number" +
             " of connections to the target cluster."),
+    MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED(732,
+            "LIM03", "The Phoenix Column size is bigger than maximum " +
+            "HBase client key value allowed size for ONE_CELL_PER_COLUMN 
table, " +
+            "try upserting column in smaller value", new Factory() {
+        @Override
+        public SQLException newException(SQLExceptionInfo info) {
+            return new 
MaxPhoenixColumnSizeExceededException(info.getMessage(), 
info.getMaxPhoenixColumnSizeBytes(),
+                    info.getPhoenixColumnSizeBytes());
+        }
+    }),
     INSUFFICIENT_MEMORY(999, "50M01", "Unable to allocate enough memory."),
     HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found"),
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
index 4681ac3..4d13bff 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
@@ -42,6 +42,8 @@ public class SQLExceptionInfo {
     public static final String MUTATION_SIZE = "mutationSize";
     public static final String MAX_MUTATION_SIZE_BYTES = 
"maxMutationSizeBytes";
     public static final String MUTATION_SIZE_BYTES = "mutationSizeBytes";
+    public static final String MAX_PHOENIX_COLUMN_SIZE_BYTES = 
"maxPhoenixColumnSizeBytes";
+    public static final String PHOENIX_COLUMN_SIZE_BYTES = 
"phoenixColumnSizeBytes";
 
     private final Throwable rootCause;
     private final SQLExceptionCode code; // Should always have one.
@@ -55,6 +57,8 @@ public class SQLExceptionInfo {
     private final int mutationSize;
     private final long maxMutationSizeBytes;
     private final long mutationSizeBytes;
+    private final int phoenixColumnSizeBytes;
+    private final int maxPhoenixColumnSizeBytes;
 
     public static class Builder {
 
@@ -70,6 +74,8 @@ public class SQLExceptionInfo {
         private int mutationSize;
         private long maxMutationSizeBytes;
         private long mutationSizeBytes;
+        private int phoenixColumnSizeBytes;
+        private int maxPhoenixColumnSizeBytes;
 
         public Builder(SQLExceptionCode code) {
             this.code = code;
@@ -130,6 +136,16 @@ public class SQLExceptionInfo {
             return this;
         }
 
+        public Builder setPhoenixColumnSizeBytes(int phoenixColumnSizeBytes) {
+            this.phoenixColumnSizeBytes = phoenixColumnSizeBytes;
+            return this;
+        }
+
+        public Builder setMaxPhoenixColumnSizeBytes(int 
maxPhoenixColumnSizeBytes) {
+            this.maxPhoenixColumnSizeBytes = maxPhoenixColumnSizeBytes;
+            return this;
+        }
+
         public SQLExceptionInfo build() {
             return new SQLExceptionInfo(this);
         }
@@ -153,6 +169,8 @@ public class SQLExceptionInfo {
         mutationSize = builder.mutationSize;
         maxMutationSizeBytes = builder.maxMutationSizeBytes;
         mutationSizeBytes = builder.mutationSizeBytes;
+        maxPhoenixColumnSizeBytes = builder.maxPhoenixColumnSizeBytes;
+        phoenixColumnSizeBytes = builder.phoenixColumnSizeBytes;
     }
 
     @Override
@@ -188,6 +206,10 @@ public class SQLExceptionInfo {
                     append(maxMutationSizeBytes);
             builder.append(" 
").append(MUTATION_SIZE_BYTES).append("=").append(mutationSizeBytes);
         }
+        if (maxPhoenixColumnSizeBytes != 0) {
+            builder.append(" 
").append(MAX_PHOENIX_COLUMN_SIZE_BYTES).append("=").append(maxPhoenixColumnSizeBytes);
+            builder.append(" 
").append(PHOENIX_COLUMN_SIZE_BYTES).append("=").append(phoenixColumnSizeBytes);
+        }
         return builder.toString();
     }
 
@@ -243,4 +265,11 @@ public class SQLExceptionInfo {
         return mutationSizeBytes;
     }
 
+    public int getMaxPhoenixColumnSizeBytes() {
+        return maxPhoenixColumnSizeBytes;
+    }
+
+    public int getPhoenixColumnSizeBytes() {
+        return phoenixColumnSizeBytes;
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 5a601c2..f97e06d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -97,6 +97,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String SCAN_CACHE_SIZE_ATTRIB = 
"hbase.client.scanner.caching";
     public static final String MAX_MUTATION_SIZE_ATTRIB = 
"phoenix.mutate.maxSize";
     public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB = 
"phoenix.mutate.maxSizeBytes";
+    public static final String HBASE_CLIENT_KEYVALUE_MAXSIZE = 
"hbase.client.keyvalue.maxsize";
 
     public static final String MUTATE_BATCH_SIZE_ATTRIB = 
"phoenix.mutate.batchSize";
     public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = 
"phoenix.mutate.batchSizeBytes";
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3773174..1f354a5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -148,6 +148,7 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true;
     public static final int DEFAULT_MAX_MUTATION_SIZE = 500000;
     public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES =  104857600; // 
100 Mb
+    public static final int DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE =  10485760; 
// 10 Mb
     public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes
     public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows 
may be updated
     public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta 
data also.
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MaxPhoenixColumnSizeExceededException.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MaxPhoenixColumnSizeExceededException.java
new file mode 100644
index 0000000..500ac15
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MaxPhoenixColumnSizeExceededException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+
+import java.sql.SQLException;
+
+/**
+ *
+ * Exception thrown when MutationState row Column Cell size is bigger than
+ * maximum allowed number
+ *
+ */
+
+public class MaxPhoenixColumnSizeExceededException extends SQLException  {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode code = 
SQLExceptionCode.MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED;
+
+    public MaxPhoenixColumnSizeExceededException() {
+        super(new SQLExceptionInfo.Builder(code).build().toString(), 
code.getSQLState(), code.getErrorCode(), null);
+    }
+
+    public MaxPhoenixColumnSizeExceededException(String rowkeyAndColumnInfo, 
int maxMutationCellSizeBytes,
+                                                 int mutationCellSizeBytes) {
+        super(new 
SQLExceptionInfo.Builder(code).setMaxPhoenixColumnSizeBytes(maxMutationCellSizeBytes)
+                        
.setPhoenixColumnSizeBytes(mutationCellSizeBytes).build().toString() + ". " + 
rowkeyAndColumnInfo,
+                code.getSQLState(), code.getErrorCode(), null);
+    }
+}

Reply via email to