This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new aac505448d PHOENIX-7318 : Support JSON_MODIFY in Upserts (#1904)
aac505448d is described below

commit aac505448df628480bc711dc2be44a644829e9b9
Author: RanganathG <[email protected]>
AuthorDate: Mon Jun 24 21:49:33 2024 +0530

    PHOENIX-7318 : Support JSON_MODIFY in Upserts (#1904)
    
    PHOENIX-7318 - Support JSON_MODIFY in Upserts
---
 .../org/apache/phoenix/compile/UpsertCompiler.java | 194 +++++++------
 .../java/org/apache/phoenix/parse/ParseNode.java   |  23 ++
 .../org/apache/phoenix/schema/types/PJson.java     |   2 +-
 .../phoenix/end2end/json/JsonFunctionsIT.java      | 300 ++++++++++++++++++++-
 4 files changed, 433 insertions(+), 86 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 890b99a90c..ccb170eec7 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -17,24 +17,6 @@
  */
 package org.apache.phoenix.compile;
 
-import static 
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.phoenix.thirdparty.com.google.common.collect.Lists.newArrayListWithCapacity;
-
-import java.sql.ParameterMetaData;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
-import org.apache.phoenix.schema.MaxPhoenixColumnSizeExceededException;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
@@ -42,12 +24,11 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
-import org.apache.phoenix.compile.ExplainPlanAttributes
-    .ExplainPlanAttributesBuilder;
+import 
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
 import 
org.apache.phoenix.coprocessorclient.UngroupedAggregateRegionObserverHelper;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -62,6 +43,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -88,6 +70,7 @@ import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.DelegateColumn;
 import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.MaxPhoenixColumnSizeExceededException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
@@ -111,15 +94,30 @@ import org.apache.phoenix.schema.types.PSmallint;
 import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.schema.types.PUnsignedLong;
 import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
-import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.phoenix.thirdparty.com.google.common.collect.Lists.newArrayListWithCapacity;
 
 public class UpsertCompiler {
 
@@ -142,6 +140,9 @@ public class UpsertCompiler {
         RowTimestampColInfo rowTsColInfo = new 
RowTimestampColInfo(useServerTimestamp, rowTimestamp);
         for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
             byte[] value = values[j];
+            if (value == null) {
+                continue;
+            }
             PColumn column = table.getColumns().get(columnIndexes[i]);
             if (value.length >= maxHBaseClientKeyValueSize &&
                     table.getImmutableStorageScheme() == 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
@@ -830,8 +831,10 @@ public class UpsertCompiler {
         final List<Expression> constantExpressions = 
Lists.newArrayListWithExpectedSize(valueNodes.size());
         // First build all the expressions, as with sequences we want to 
collect them all first
         // and initialize them in one batch
+        List<Pair<ColumnName, ParseNode>> jsonExpressions = 
Lists.newArrayList();
+        List<Pair<ColumnName, ParseNode>> nonPKColumns = Lists.newArrayList();
         for (ParseNode valueNode : valueNodes) {
-            if (!valueNode.isStateless()) {
+            if (!valueNode.hasJsonExpression() && !valueNode.isStateless()) {
                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build().buildException();
             }
             PColumn column = allColumns.get(columnIndexes[nodeIndex]);
@@ -842,11 +845,24 @@ public class UpsertCompiler {
                         expression.getDataType(), column.getDataType(), 
"expression: "
                                 + expression.toString() + " in column " + 
column);
             }
+            if (!SchemaUtil.isPKColumn(column) && 
!valueNode.hasJsonExpression()) {
+                nonPKColumns.add(new Pair<>(
+                        
ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
+                                column.getName().getString()), valueNode));
+            } else if (valueNode.hasJsonExpression()) {
+                jsonExpressions.add(new Pair<>(
+                        
ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
+                                column.getName().getString()), valueNode));
+            }
             constantExpressions.add(expression);
             nodeIndex++;
         }
+        if (nonPKColumns.size() > 0 && jsonExpressions.size() > 0) {
+            jsonExpressions.addAll(nonPKColumns);
+            nonPKColumns.clear();
+        }
         byte[] onDupKeyBytesToBe = null;
-        List<Pair<ColumnName,ParseNode>> onDupKeyPairs = 
upsert.getOnDupKeyPairs();
+        List<Pair<ColumnName, ParseNode>> onDupKeyPairs = 
upsert.getOnDupKeyPairs();
         if (onDupKeyPairs != null) {
             if (table.isImmutableRows()) {
                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE)
@@ -869,60 +885,10 @@ public class UpsertCompiler {
             if (onDupKeyPairs.isEmpty()) { // ON DUPLICATE KEY IGNORE
                 onDupKeyBytesToBe = 
PhoenixIndexBuilderHelper.serializeOnDupKeyIgnore();
             } else {                       // ON DUPLICATE KEY UPDATE;
-                int position = table.getBucketNum() == null ? 0 : 1;
-                UpdateColumnCompiler compiler = new 
UpdateColumnCompiler(context);
-                int nColumns = onDupKeyPairs.size();
-                List<Expression> updateExpressions = 
Lists.newArrayListWithExpectedSize(nColumns);
-                LinkedHashSet<PColumn> updateColumns = 
Sets.newLinkedHashSetWithExpectedSize(nColumns + 1);
-                updateColumns.add(new PColumnImpl(
-                        table.getPKColumns().get(position).getName(), // Use 
first PK column name as we know it won't conflict with others
-                        null, PVarbinary.INSTANCE, null, null, false, 
position, SortOrder.getDefault(), 0, null, false, null, false, false, null, 
table.getPKColumns().get(position).getTimestamp()));
-                position++;
-                for (Pair<ColumnName,ParseNode> columnPair : onDupKeyPairs) {
-                    ColumnName colName = columnPair.getFirst();
-                    PColumn updateColumn = resolver.resolveColumn(null, 
colName.getFamilyName(), colName.getColumnName()).getColumn();
-                    if (SchemaUtil.isPKColumn(updateColumn)) {
-                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY)
-                        .setSchemaName(table.getSchemaName().getString())
-                        .setTableName(table.getTableName().getString())
-                        .setColumnName(updateColumn.getName().getString())
-                        .build().buildException();
-                    }
-                    final int columnPosition = position++;
-                    if (!updateColumns.add(new DelegateColumn(updateColumn) {
-                        @Override
-                        public int getPosition() {
-                            return columnPosition;
-                        }
-                    })) {
-                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY)
-                            .setSchemaName(table.getSchemaName().getString())
-                            .setTableName(table.getTableName().getString())
-                            .setColumnName(updateColumn.getName().getString())
-                            .build().buildException();
-                    };
-                    ParseNode updateNode = columnPair.getSecond();
-                    compiler.setColumn(updateColumn);
-                    Expression updateExpression = updateNode.accept(compiler);
-                    // Check that updateExpression is coercible to updateColumn
-                    if (updateExpression.getDataType() != null && 
!updateExpression.getDataType().isCastableTo(updateColumn.getDataType())) {
-                        throw TypeMismatchException.newException(
-                                updateExpression.getDataType(), 
updateColumn.getDataType(), "expression: "
-                                        + updateExpression.toString() + " for 
column " + updateColumn);
-                    }
-                    if (compiler.isAggregate()) {
-                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY)
-                            .setSchemaName(table.getSchemaName().getString())
-                            .setTableName(table.getTableName().getString())
-                            .setColumnName(updateColumn.getName().getString())
-                            .build().buildException();
-                    }
-                    updateExpressions.add(updateExpression);
-                }
-                PTable onDupKeyTable = PTableImpl.builderWithColumns(table, 
updateColumns)
-                        .build();
-                onDupKeyBytesToBe = 
PhoenixIndexBuilderHelper.serializeOnDupKeyUpdate(onDupKeyTable, 
updateExpressions);
+                onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, 
onDupKeyPairs, resolver);
             }
+        } else if (!jsonExpressions.isEmpty()) {
+            onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, 
jsonExpressions, resolver);
         }
         final byte[] onDupKeyBytes = onDupKeyBytesToBe;
         
@@ -931,6 +897,72 @@ public class UpsertCompiler {
                 connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, 
maxSize, maxSizeBytes);
     }
 
+    private static byte[] getOnDuplicateKeyBytes(PTable table, 
StatementContext context,
+            List<Pair<ColumnName, ParseNode>> onDupKeyPairs, ColumnResolver 
resolver)
+            throws SQLException {
+        byte[] onDupKeyBytesToBe;
+        int position = table.getBucketNum() == null ? 0 : 1;
+        UpdateColumnCompiler compiler = new UpdateColumnCompiler(context);
+        int nColumns = onDupKeyPairs.size();
+        List<Expression> updateExpressions = 
Lists.newArrayListWithExpectedSize(nColumns);
+        LinkedHashSet<PColumn> updateColumns = 
Sets.newLinkedHashSetWithExpectedSize(nColumns + 1);
+        updateColumns.add(new 
PColumnImpl(table.getPKColumns().get(position).getName(),
+                // Use first PK column name as we know it won't conflict with 
others
+                null, PVarbinary.INSTANCE, null, null, false, position, 
SortOrder.getDefault(), 0,
+                null, false, null, false, false, null,
+                table.getPKColumns().get(position).getTimestamp()));
+        position++;
+        for (Pair<ColumnName, ParseNode> columnPair : onDupKeyPairs) {
+            ColumnName colName = columnPair.getFirst();
+            PColumn
+                    updateColumn =
+                    resolver.resolveColumn(null, colName.getFamilyName(), 
colName.getColumnName())
+                            .getColumn();
+            if (SchemaUtil.isPKColumn(updateColumn)) {
+                throw new SQLExceptionInfo.Builder(
+                        
SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY).setSchemaName(
+                                table.getSchemaName().getString())
+                        .setTableName(table.getTableName().getString())
+                        
.setColumnName(updateColumn.getName().getString()).build().buildException();
+            }
+            final int columnPosition = position++;
+            if (!updateColumns.add(new DelegateColumn(updateColumn) {
+                @Override
+                public int getPosition() {
+                    return columnPosition;
+                }
+            })) {
+                throw new SQLExceptionInfo.Builder(
+                        
SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY).setSchemaName(
+                                table.getSchemaName().getString())
+                        .setTableName(table.getTableName().getString())
+                        
.setColumnName(updateColumn.getName().getString()).build().buildException();
+            }
+            ParseNode updateNode = columnPair.getSecond();
+            compiler.setColumn(updateColumn);
+            Expression updateExpression = updateNode.accept(compiler);
+            // Check that updateExpression is coercible to updateColumn
+            if (updateExpression.getDataType() != null && 
!updateExpression.getDataType()
+                    .isCastableTo(updateColumn.getDataType())) {
+                throw 
TypeMismatchException.newException(updateExpression.getDataType(),
+                        updateColumn.getDataType(),
+                        "expression: " + updateExpression + " for column " + 
updateColumn);
+            }
+            if (compiler.isAggregate()) {
+                throw new SQLExceptionInfo.Builder(
+                        
SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY).setSchemaName(
+                                table.getSchemaName().getString())
+                        .setTableName(table.getTableName().getString())
+                        
.setColumnName(updateColumn.getName().getString()).build().buildException();
+            }
+            updateExpressions.add(updateExpression);
+        }
+        PTable onDupKeyTable = PTableImpl.builderWithColumns(table, 
updateColumns).build();
+        onDupKeyBytesToBe =
+                
PhoenixIndexBuilderHelper.serializeOnDupKeyUpdate(onDupKeyTable, 
updateExpressions);
+        return onDupKeyBytesToBe;
+    }
+
     private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable 
table) {
         checkArgument(table.getRowTimestampColPos() != -1, "Call this method 
only for tables with row timestamp column");
         int rowTimestampColPKSlot = table.getRowTimestampColPos();
@@ -1246,6 +1278,10 @@ public class UpsertCompiler {
             Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
                 sequenceManager.newSequenceTuple(null);
             for (Expression constantExpression : constantExpressions) {
+                if (!constantExpression.isStateless()) {
+                    nodeIndex++;
+                    continue;
+                }
                 PColumn column = allColumns.get(columnIndexes[nodeIndex]);
                 constantExpression.evaluate(tuple, ptr);
                 Object value = null;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNode.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNode.java
index 362e65786d..12d6737a44 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNode.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNode.java
@@ -71,6 +71,16 @@ public abstract class ParseNode {
         return finder.hasSubquery;
     }
 
+    public boolean hasJsonExpression() {
+        JsonFunctionFinder finder = new JsonFunctionFinder();
+        try {
+            this.accept(finder);
+        } catch (SQLException e) {
+            // Not possible.
+        }
+        return finder.hasJsonFunction;
+    }
+
     public abstract void toSQL(ColumnResolver resolver, StringBuilder buf);
 
     private static class SubqueryFinder extends 
StatelessTraverseAllParseNodeVisitor {
@@ -82,4 +92,17 @@ public abstract class ParseNode {
             return null;
         }
     }
+
+    private static class JsonFunctionFinder extends 
StatelessTraverseAllParseNodeVisitor {
+        private boolean hasJsonFunction = false;
+        @Override
+        public boolean visitEnter(FunctionParseNode node) throws SQLException {
+            if (node instanceof JsonValueParseNode
+                    || node instanceof JsonQueryParseNode
+                    || node instanceof JsonModifyParseNode) {
+                hasJsonFunction = true;
+            }
+            return true;
+        }
+    }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java
index e437fb5fe2..748d5b61e9 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java
@@ -119,7 +119,7 @@ public class PJson extends PVarbinary {
 
     @Override
     public boolean isBytesComparableWith(@SuppressWarnings("rawtypes") 
PDataType otherType) {
-        return otherType == PVarbinary.INSTANCE;
+        return otherType == PVarbinary.INSTANCE || otherType == PJson.INSTANCE;
     }
 
     @Override
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
index 04c1070fe9..05458cbdde 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
@@ -169,10 +169,16 @@ public class JsonFunctionsIT extends 
ParallelStatsDisabledIT {
             assertEquals("[\"Sport\", \"alto1\", \"Books\"]", rs.getString(4));
             assertEquals("{\"type\": 1, \"address\": {\"town\": 
\"Manchester\", \"county\": \"Avon\", \"country\": \"England\", \"exists\": 
true}, \"tags\": [\"Sport\", \"alto1\", \"Books\"]}", rs.getString(5));
 
-            // Now check for empty match
-            query = String.format(queryTemplate, "Windsors");
+            upsert ="UPSERT INTO " + tableName + " (pk, col, jsoncol) 
VALUES(2,1, JSON_MODIFY(jsoncol, '$.info.address.town', '\"Manchester\"')" +
+                    ") ON DUPLICATE KEY IGNORE";
+            conn.createStatement().execute(upsert);
+
+            query = "SELECT pk, col, jsoncol FROM " + tableName + " WHERE pk = 
2";
             rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertEquals(1, rs.getInt(2));
+            assertEquals(null, rs.getString(3));
         }
     }
 
@@ -225,10 +231,16 @@ public class JsonFunctionsIT extends 
ParallelStatsDisabledIT {
             assertEquals("[\"Sport\", \"alto1\", \"Books\"]", rs.getString(4));
             assertEquals("{\"type\": 1, \"address\": {\"town\": 
\"Manchester\", \"county\": \"Avon\", \"country\": \"England\", \"exists\": 
true}, \"tags\": [\"Sport\", \"alto1\", \"Books\"]}", rs.getString(5));
 
-            // Now check for empty match
-            query = String.format(queryTemplate, "Windsors");
+            upsert ="UPSERT INTO " + tableName + " (pk, col, jsoncol) 
VALUES(2,1, JSON_MODIFY(jsoncol, '$.info.address.town', '\"Manchester\"')" +
+                    ") ON DUPLICATE KEY IGNORE";
+            conn.createStatement().execute(upsert);
+            conn.commit();
+            query = "SELECT pk, col, jsoncol FROM " + tableName + " WHERE pk = 
2";
             rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertEquals(1, rs.getInt(2));
+            assertEquals(null, rs.getString(3));
         }
     }
 
@@ -749,4 +761,280 @@ public class JsonFunctionsIT extends 
ParallelStatsDisabledIT {
             assertFalse(rs.next());
         }
     }
+
+    @Test
+    public void testUpsertJsonModifyWithAutoCommit() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String ddl = "create table " + tableName +
+                    " (pk integer primary key, " +
+                    "col integer, " +
+                    "strcol varchar, " +
+                    "strcol1 varchar, " +
+                    "strcol2 varchar, " +
+                    "strcol3 varchar, " +
+                    "strcol4 varchar, " +
+                    "strcol5 varchar, " +
+                    "jsoncol json)";
+            conn.createStatement().execute(ddl);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (pk,col,strcol,jsoncol) VALUES (?,?,?,?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setString(3, "");
+            stmt.setString(4, basicJson);
+            stmt.execute();
+            String upsert = "UPSERT INTO " + tableName + 
"(pk,col,strcol,jsoncol) VALUES(1,2,JSON_VALUE(jsoncol, 
'$.info.address.town'),JSON_MODIFY(jsoncol, '$.info.address.town', 
'\"Manchester\"')) ";
+            conn.createStatement().execute(upsert);
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,col,strcol1,jsoncol) 
VALUES(1,2,JSON_VALUE(jsoncol, '$.info.tags[1]'),JSON_MODIFY(jsoncol, 
'$.info.tags[1]', '\"alto1\"')) ");
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,strcol2,jsoncol,col) 
VALUES(1,JSON_VALUE(jsoncol, '$.type'),JSON_MODIFY(jsoncol, '$.info.tags', 
'[\"Sport\", \"alto1\", \"Books\"]'),3) ");
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,col,strcol3,jsoncol) 
SELECT pk, col, JSON_VALUE(jsoncol, '$.info.tags[2]') ,JSON_MODIFY(jsoncol, 
'$.info.tags[2]', '\"UpsertSelectVal\"') from "
+                            + tableName + " WHERE pk = 1");
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + 
"(pk,col,strcol4,strcol5,jsoncol) SELECT pk, col, JSON_VALUE(jsoncol, 
'$.info.tags[2]'),JSON_VALUE(jsoncol, '$.info.tags[2]'),JSON_MODIFY(jsoncol, 
'$.info.tags[2]', '\"UpsertSelectVal2\"') from "
+                            + tableName + " WHERE pk = 1");
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,col,strcol1,jsoncol) 
VALUES(2,1,'Hello',JSON_MODIFY(jsoncol, '$.info.address.town', 
'\"Manchester\"')) ");
+            String
+                    queryTemplate =
+                    "SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, 
'$.info.address.town'), " +
+                            "JSON_VALUE(jsoncol, '$.info.tags[1]'), 
JSON_QUERY(jsoncol, '$.info.tags'), " +
+                            "JSON_QUERY(jsoncol, '$.info'), " + 
"JSON_VALUE(jsoncol, '$.info.tags[2]'), col, " +
+                            "strcol, strcol1, strcol2,strcol3, strcol4, 
strcol5 " +
+                            " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, 
'$.name') = '%s' AND pk = 1";
+            String query = String.format(queryTemplate, "AndersenFamily");
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("Basic", rs.getString(1));
+            assertEquals("Manchester", rs.getString(2));
+            assertEquals("alto1", rs.getString(3));
+            assertEquals("UpsertSelectVal2", rs.getString(6));
+            assertEquals(3, rs.getInt(7));
+            assertEquals("Bristol", rs.getString(8));
+            assertEquals("Water polo", rs.getString(9));
+            assertEquals("Basic", rs.getString(10));
+            assertEquals("Books", rs.getString(11));
+            assertEquals("UpsertSelectVal", rs.getString(12));
+            assertEquals("UpsertSelectVal", rs.getString(13));
+
+            query = "SELECT pk, col, strcol1, jsoncol FROM " + tableName + " 
WHERE pk = 2";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(2));
+            assertEquals("Hello", rs.getString(3));
+            assertEquals(null, rs.getString(4));
+
+            upsert =
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) 
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')" 
+ ") ON DUPLICATE KEY UPDATE jsoncol = JSON_MODIFY(jsoncol, 
'$.info.address.town', '\"ShouldUpdate\"')";
+            conn.createStatement().execute(upsert);
+            query =
+                    "SELECT pk, col, JSON_VALUE(jsoncol, 
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(2));
+            assertEquals("ShouldUpdate", rs.getString(3));
+
+            upsert =
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) 
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')" 
+ ") ON DUPLICATE KEY IGNORE";
+            conn.createStatement().execute(upsert);
+            query =
+                    "SELECT pk, col, JSON_VALUE(jsoncol, 
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(2));
+            assertEquals("ShouldUpdate", rs.getString(3));
+        }
+    }
+
+    @Test
+    public void testUpsertJsonModifyWithOutAutoCommit() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName +
+                    " (pk integer primary key, " +
+                    "col integer, " +
+                    "strcol varchar, " +
+                    "strcol1 varchar, " +
+                    "strcol2 varchar, " +
+                    "strcol3 varchar, " +
+                    "strcol4 varchar, " +
+                    "strcol5 varchar, " +
+                    "jsoncol json)";
+            conn.createStatement().execute(ddl);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (pk,col,strcol,jsoncol) VALUES (?,?,?,?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setString(3, "");
+            stmt.setString(4, basicJson);
+            stmt.execute();
+            conn.commit();
+            String upsert = "UPSERT INTO " + tableName + 
"(pk,col,strcol,jsoncol) VALUES(1,2,JSON_VALUE(jsoncol, 
'$.info.address.town'),JSON_MODIFY(jsoncol, '$.info.address.town', 
'\"Manchester\"')) ";
+            conn.createStatement().execute(upsert);
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,col,strcol1,jsoncol) 
VALUES(1,2,JSON_VALUE(jsoncol, '$.info.tags[1]'),JSON_MODIFY(jsoncol, 
'$.info.tags[1]', '\"alto1\"')) ");
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,strcol2,jsoncol,col) 
VALUES(1,JSON_VALUE(jsoncol, '$.type'),JSON_MODIFY(jsoncol, '$.info.tags', 
'[\"Sport\", \"alto1\", \"Books\"]'),3) ");
+            conn.commit();
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,col,strcol3,jsoncol) 
SELECT pk, col, JSON_VALUE(jsoncol, '$.info.tags[2]') ,JSON_MODIFY(jsoncol, 
'$.info.tags[2]', '\"UpsertSelectVal\"') from "
+                            + tableName + " WHERE pk = 1");
+            conn.commit();
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + 
"(pk,col,strcol4,strcol5,jsoncol) SELECT pk, col, JSON_VALUE(jsoncol, 
'$.info.tags[2]'),JSON_VALUE(jsoncol, '$.info.tags[2]'),JSON_MODIFY(jsoncol, 
'$.info.tags[2]', '\"UpsertSelectVal2\"') from "
+                            + tableName + " WHERE pk = 1");
+            conn.commit();
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,col,strcol1,jsoncol) 
VALUES(2,1,'Hello',JSON_MODIFY(jsoncol, '$.info.address.town', 
'\"Manchester\"')) ");
+            conn.commit();
+            String
+                    queryTemplate =
+                    "SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, 
'$.info.address.town'), " +
+                            "JSON_VALUE(jsoncol, '$.info.tags[1]'), 
JSON_QUERY(jsoncol, '$.info.tags'), " +
+                            "JSON_QUERY(jsoncol, '$.info'), " + 
"JSON_VALUE(jsoncol, '$.info.tags[2]'), col, " +
+                            "strcol, strcol1, strcol2,strcol3, strcol4, 
strcol5 " +
+                            " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, 
'$.name') = '%s' AND pk = 1";
+            String query = String.format(queryTemplate, "AndersenFamily");
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("Basic", rs.getString(1));
+            assertEquals("Manchester", rs.getString(2));
+            assertEquals("alto1", rs.getString(3));
+            assertEquals("UpsertSelectVal2", rs.getString(6));
+            assertEquals(3, rs.getInt(7));
+            assertEquals("Bristol", rs.getString(8));
+            assertEquals("Water polo", rs.getString(9));
+            assertEquals("Basic", rs.getString(10));
+            assertEquals("Books", rs.getString(11));
+            assertEquals("UpsertSelectVal", rs.getString(12));
+            assertEquals("UpsertSelectVal", rs.getString(13));
+
+            query = "SELECT pk, col, strcol1, jsoncol FROM " + tableName + " 
WHERE pk = 2";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(2));
+            assertEquals("Hello", rs.getString(3));
+            assertEquals(null, rs.getString(4));
+
+            upsert =
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) 
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')" 
+ ") ON DUPLICATE KEY UPDATE jsoncol = JSON_MODIFY(jsoncol, 
'$.info.address.town', '\"ShouldUpdate\"')";
+            conn.createStatement().execute(upsert);
+            conn.commit();
+            query =
+                    "SELECT pk, col, JSON_VALUE(jsoncol, 
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(2));
+            assertEquals("ShouldUpdate", rs.getString(3));
+
+            upsert =
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) 
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')" 
+ ") ON DUPLICATE KEY IGNORE";
+            conn.createStatement().execute(upsert);
+            conn.commit();
+            query =
+                    "SELECT pk, col, JSON_VALUE(jsoncol, 
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(2));
+            assertEquals("ShouldUpdate", rs.getString(3));
+        }
+    }
+
+    @Test
+    public void testUpsertJsonModifyMultipleCF() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName +
+                    " (pk integer primary key, " +
+                    "col integer, " +
+                    "a.strcol varchar, " +
+                    "a.strcol1 varchar, " +
+                    "b.strcol varchar, " +
+                    "b.strcol1 varchar, " +
+                    "strcol4 varchar, " +
+                    "strcol5 varchar, " +
+                    "jsoncol json)";
+            conn.createStatement().execute(ddl);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (pk,col,a.strcol,jsoncol) VALUES (?,?,?,?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setString(3, "");
+            stmt.setString(4, basicJson);
+            stmt.execute();
+            conn.commit();
+            String upsert = "UPSERT INTO " + tableName + 
"(pk,col,a.strcol,jsoncol) VALUES(1,2,JSON_VALUE(jsoncol, 
'$.info.address.town') || 'City',JSON_MODIFY(jsoncol, '$.info.address.town', 
'\"Manchester\"')) ";
+            conn.createStatement().execute(upsert);
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,col,a.strcol1,jsoncol) 
VALUES(1,2,JSON_VALUE(jsoncol, '$.info.tags[1]'),JSON_MODIFY(jsoncol, 
'$.info.tags[1]', '\"alto1\"')) ");
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,b.strcol,jsoncol,col) 
VALUES(1,JSON_VALUE(jsoncol, '$.type'),JSON_MODIFY(jsoncol, '$.info.tags', 
'[\"Sport\", \"alto1\", \"Books\"]'),3) ");
+            conn.commit();
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,col,b.strcol1,jsoncol) 
SELECT pk, col, JSON_VALUE(jsoncol, '$.info.tags[2]') ,JSON_MODIFY(jsoncol, 
'$.info.tags[2]', '\"UpsertSelectVal\"') from "
+                            + tableName + " WHERE pk = 1");
+            conn.commit();
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + 
"(pk,col,strcol4,strcol5,jsoncol) SELECT pk, col, JSON_VALUE(jsoncol, 
'$.info.tags[2]'),JSON_VALUE(jsoncol, '$.info.tags[2]'),JSON_MODIFY(jsoncol, 
'$.info.tags[2]', '\"UpsertSelectVal2\"') from "
+                            + tableName + " WHERE pk = 1");
+            conn.commit();
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + "(pk,col,a.strcol1,jsoncol) 
VALUES(2,1,'Hello',JSON_MODIFY(jsoncol, '$.info.address.town', 
'\"Manchester\"')) ");
+            conn.commit();
+            String
+                    queryTemplate =
+                    "SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, 
'$.info.address.town'), " +
+                            "JSON_VALUE(jsoncol, '$.info.tags[1]'), 
JSON_QUERY(jsoncol, '$.info.tags'), " +
+                            "JSON_QUERY(jsoncol, '$.info'), " + 
"JSON_VALUE(jsoncol, '$.info.tags[2]'), col, " +
+                            "a.strcol, a.strcol1, b.strcol,b.strcol1, strcol4, 
strcol5 " +
+                            " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, 
'$.name') = '%s' AND pk = 1";
+            String query = String.format(queryTemplate, "AndersenFamily");
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("Basic", rs.getString(1));
+            assertEquals("Manchester", rs.getString(2));
+            assertEquals("alto1", rs.getString(3));
+            assertEquals("UpsertSelectVal2", rs.getString(6));
+            assertEquals(3, rs.getInt(7));
+            assertEquals("BristolCity", rs.getString(8));
+            assertEquals("Water polo", rs.getString(9));
+            assertEquals("Basic", rs.getString(10));
+            assertEquals("Books", rs.getString(11));
+            assertEquals("UpsertSelectVal", rs.getString(12));
+            assertEquals("UpsertSelectVal", rs.getString(13));
+
+            query = "SELECT pk, col, a.strcol1, jsoncol FROM " + tableName + " 
WHERE pk = 2";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(2));
+            assertEquals("Hello", rs.getString(3));
+            assertEquals(null, rs.getString(4));
+
+            upsert =
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) 
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')" 
+ ") ON DUPLICATE KEY UPDATE jsoncol = JSON_MODIFY(jsoncol, 
'$.info.address.town', '\"ShouldUpdate\"')";
+            conn.createStatement().execute(upsert);
+            conn.commit();
+            query =
+                    "SELECT pk, col, JSON_VALUE(jsoncol, 
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(2));
+            assertEquals("ShouldUpdate", rs.getString(3));
+
+            upsert =
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) 
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')" 
+ ") ON DUPLICATE KEY IGNORE";
+            conn.createStatement().execute(upsert);
+            conn.commit();
+            query =
+                    "SELECT pk, col, JSON_VALUE(jsoncol, 
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(2));
+            assertEquals("ShouldUpdate", rs.getString(3));
+        }
+    }
 }


Reply via email to