This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new aac505448d PHOENIX-7318 : Support JSON_MODIFY in Upserts (#1904)
aac505448d is described below
commit aac505448df628480bc711dc2be44a644829e9b9
Author: RanganathG <[email protected]>
AuthorDate: Mon Jun 24 21:49:33 2024 +0530
PHOENIX-7318 : Support JSON_MODIFY in Upserts (#1904)
PHOENIX-7318 - Support JSON_MODIFY in Upserts
---
.../org/apache/phoenix/compile/UpsertCompiler.java | 194 +++++++------
.../java/org/apache/phoenix/parse/ParseNode.java | 23 ++
.../org/apache/phoenix/schema/types/PJson.java | 2 +-
.../phoenix/end2end/json/JsonFunctionsIT.java | 300 ++++++++++++++++++++-
4 files changed, 433 insertions(+), 86 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 890b99a90c..ccb170eec7 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -17,24 +17,6 @@
*/
package org.apache.phoenix.compile;
-import static
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
-import static
org.apache.phoenix.thirdparty.com.google.common.collect.Lists.newArrayListWithCapacity;
-
-import java.sql.ParameterMetaData;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
-import org.apache.phoenix.schema.MaxPhoenixColumnSizeExceededException;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
@@ -42,12 +24,11 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient;
-import org.apache.phoenix.compile.ExplainPlanAttributes
- .ExplainPlanAttributesBuilder;
+import
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import
org.apache.phoenix.coprocessorclient.UngroupedAggregateRegionObserverHelper;
import org.apache.phoenix.exception.DataExceedsCapacityException;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -62,6 +43,7 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -88,6 +70,7 @@ import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.ConstraintViolationException;
import org.apache.phoenix.schema.DelegateColumn;
import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.MaxPhoenixColumnSizeExceededException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
@@ -111,15 +94,30 @@ import org.apache.phoenix.schema.types.PSmallint;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PUnsignedLong;
import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.phoenix.thirdparty.com.google.common.collect.Lists.newArrayListWithCapacity;
public class UpsertCompiler {
@@ -142,6 +140,9 @@ public class UpsertCompiler {
RowTimestampColInfo rowTsColInfo = new
RowTimestampColInfo(useServerTimestamp, rowTimestamp);
for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
byte[] value = values[j];
+ if (value == null) {
+ continue;
+ }
PColumn column = table.getColumns().get(columnIndexes[i]);
if (value.length >= maxHBaseClientKeyValueSize &&
table.getImmutableStorageScheme() ==
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
@@ -830,8 +831,10 @@ public class UpsertCompiler {
final List<Expression> constantExpressions =
Lists.newArrayListWithExpectedSize(valueNodes.size());
// First build all the expressions, as with sequences we want to
collect them all first
// and initialize them in one batch
+ List<Pair<ColumnName, ParseNode>> jsonExpressions =
Lists.newArrayList();
+ List<Pair<ColumnName, ParseNode>> nonPKColumns = Lists.newArrayList();
for (ParseNode valueNode : valueNodes) {
- if (!valueNode.isStateless()) {
+ if (!valueNode.hasJsonExpression() && !valueNode.isStateless()) {
throw new
SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build().buildException();
}
PColumn column = allColumns.get(columnIndexes[nodeIndex]);
@@ -842,11 +845,24 @@ public class UpsertCompiler {
expression.getDataType(), column.getDataType(),
"expression: "
+ expression.toString() + " in column " +
column);
}
+ if (!SchemaUtil.isPKColumn(column) &&
!valueNode.hasJsonExpression()) {
+ nonPKColumns.add(new Pair<>(
+
ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
+ column.getName().getString()), valueNode));
+ } else if (valueNode.hasJsonExpression()) {
+ jsonExpressions.add(new Pair<>(
+
ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
+ column.getName().getString()), valueNode));
+ }
constantExpressions.add(expression);
nodeIndex++;
}
+ if (nonPKColumns.size() > 0 && jsonExpressions.size() > 0) {
+ jsonExpressions.addAll(nonPKColumns);
+ nonPKColumns.clear();
+ }
byte[] onDupKeyBytesToBe = null;
- List<Pair<ColumnName,ParseNode>> onDupKeyPairs =
upsert.getOnDupKeyPairs();
+ List<Pair<ColumnName, ParseNode>> onDupKeyPairs =
upsert.getOnDupKeyPairs();
if (onDupKeyPairs != null) {
if (table.isImmutableRows()) {
throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE)
@@ -869,60 +885,10 @@ public class UpsertCompiler {
if (onDupKeyPairs.isEmpty()) { // ON DUPLICATE KEY IGNORE
onDupKeyBytesToBe =
PhoenixIndexBuilderHelper.serializeOnDupKeyIgnore();
} else { // ON DUPLICATE KEY UPDATE;
- int position = table.getBucketNum() == null ? 0 : 1;
- UpdateColumnCompiler compiler = new
UpdateColumnCompiler(context);
- int nColumns = onDupKeyPairs.size();
- List<Expression> updateExpressions =
Lists.newArrayListWithExpectedSize(nColumns);
- LinkedHashSet<PColumn> updateColumns =
Sets.newLinkedHashSetWithExpectedSize(nColumns + 1);
- updateColumns.add(new PColumnImpl(
- table.getPKColumns().get(position).getName(), // Use
first PK column name as we know it won't conflict with others
- null, PVarbinary.INSTANCE, null, null, false,
position, SortOrder.getDefault(), 0, null, false, null, false, false, null,
table.getPKColumns().get(position).getTimestamp()));
- position++;
- for (Pair<ColumnName,ParseNode> columnPair : onDupKeyPairs) {
- ColumnName colName = columnPair.getFirst();
- PColumn updateColumn = resolver.resolveColumn(null,
colName.getFamilyName(), colName.getColumnName()).getColumn();
- if (SchemaUtil.isPKColumn(updateColumn)) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY)
- .setSchemaName(table.getSchemaName().getString())
- .setTableName(table.getTableName().getString())
- .setColumnName(updateColumn.getName().getString())
- .build().buildException();
- }
- final int columnPosition = position++;
- if (!updateColumns.add(new DelegateColumn(updateColumn) {
- @Override
- public int getPosition() {
- return columnPosition;
- }
- })) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY)
- .setSchemaName(table.getSchemaName().getString())
- .setTableName(table.getTableName().getString())
- .setColumnName(updateColumn.getName().getString())
- .build().buildException();
- };
- ParseNode updateNode = columnPair.getSecond();
- compiler.setColumn(updateColumn);
- Expression updateExpression = updateNode.accept(compiler);
- // Check that updateExpression is coercible to updateColumn
- if (updateExpression.getDataType() != null &&
!updateExpression.getDataType().isCastableTo(updateColumn.getDataType())) {
- throw TypeMismatchException.newException(
- updateExpression.getDataType(),
updateColumn.getDataType(), "expression: "
- + updateExpression.toString() + " for
column " + updateColumn);
- }
- if (compiler.isAggregate()) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY)
- .setSchemaName(table.getSchemaName().getString())
- .setTableName(table.getTableName().getString())
- .setColumnName(updateColumn.getName().getString())
- .build().buildException();
- }
- updateExpressions.add(updateExpression);
- }
- PTable onDupKeyTable = PTableImpl.builderWithColumns(table,
updateColumns)
- .build();
- onDupKeyBytesToBe =
PhoenixIndexBuilderHelper.serializeOnDupKeyUpdate(onDupKeyTable,
updateExpressions);
+ onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context,
onDupKeyPairs, resolver);
}
+ } else if (!jsonExpressions.isEmpty()) {
+ onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context,
jsonExpressions, resolver);
}
final byte[] onDupKeyBytes = onDupKeyBytesToBe;
@@ -931,6 +897,72 @@ public class UpsertCompiler {
connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes,
maxSize, maxSizeBytes);
}
+ private static byte[] getOnDuplicateKeyBytes(PTable table,
StatementContext context,
+ List<Pair<ColumnName, ParseNode>> onDupKeyPairs, ColumnResolver
resolver)
+ throws SQLException {
+ byte[] onDupKeyBytesToBe;
+ int position = table.getBucketNum() == null ? 0 : 1;
+ UpdateColumnCompiler compiler = new UpdateColumnCompiler(context);
+ int nColumns = onDupKeyPairs.size();
+ List<Expression> updateExpressions =
Lists.newArrayListWithExpectedSize(nColumns);
+ LinkedHashSet<PColumn> updateColumns =
Sets.newLinkedHashSetWithExpectedSize(nColumns + 1);
+ updateColumns.add(new
PColumnImpl(table.getPKColumns().get(position).getName(),
+ // Use first PK column name as we know it won't conflict with
others
+ null, PVarbinary.INSTANCE, null, null, false, position,
SortOrder.getDefault(), 0,
+ null, false, null, false, false, null,
+ table.getPKColumns().get(position).getTimestamp()));
+ position++;
+ for (Pair<ColumnName, ParseNode> columnPair : onDupKeyPairs) {
+ ColumnName colName = columnPair.getFirst();
+ PColumn
+ updateColumn =
+ resolver.resolveColumn(null, colName.getFamilyName(),
colName.getColumnName())
+ .getColumn();
+ if (SchemaUtil.isPKColumn(updateColumn)) {
+ throw new SQLExceptionInfo.Builder(
+
SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY).setSchemaName(
+ table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString())
+
.setColumnName(updateColumn.getName().getString()).build().buildException();
+ }
+ final int columnPosition = position++;
+ if (!updateColumns.add(new DelegateColumn(updateColumn) {
+ @Override
+ public int getPosition() {
+ return columnPosition;
+ }
+ })) {
+ throw new SQLExceptionInfo.Builder(
+
SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY).setSchemaName(
+ table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString())
+
.setColumnName(updateColumn.getName().getString()).build().buildException();
+ }
+ ParseNode updateNode = columnPair.getSecond();
+ compiler.setColumn(updateColumn);
+ Expression updateExpression = updateNode.accept(compiler);
+ // Check that updateExpression is coercible to updateColumn
+ if (updateExpression.getDataType() != null &&
!updateExpression.getDataType()
+ .isCastableTo(updateColumn.getDataType())) {
+ throw
TypeMismatchException.newException(updateExpression.getDataType(),
+ updateColumn.getDataType(),
+ "expression: " + updateExpression + " for column " +
updateColumn);
+ }
+ if (compiler.isAggregate()) {
+ throw new SQLExceptionInfo.Builder(
+
SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY).setSchemaName(
+ table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString())
+
.setColumnName(updateColumn.getName().getString()).build().buildException();
+ }
+ updateExpressions.add(updateExpression);
+ }
+ PTable onDupKeyTable = PTableImpl.builderWithColumns(table,
updateColumns).build();
+ onDupKeyBytesToBe =
+
PhoenixIndexBuilderHelper.serializeOnDupKeyUpdate(onDupKeyTable,
updateExpressions);
+ return onDupKeyBytesToBe;
+ }
+
private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable
table) {
checkArgument(table.getRowTimestampColPos() != -1, "Call this method
only for tables with row timestamp column");
int rowTimestampColPKSlot = table.getRowTimestampColPos();
@@ -1246,6 +1278,10 @@ public class UpsertCompiler {
Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
sequenceManager.newSequenceTuple(null);
for (Expression constantExpression : constantExpressions) {
+ if (!constantExpression.isStateless()) {
+ nodeIndex++;
+ continue;
+ }
PColumn column = allColumns.get(columnIndexes[nodeIndex]);
constantExpression.evaluate(tuple, ptr);
Object value = null;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNode.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNode.java
index 362e65786d..12d6737a44 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNode.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNode.java
@@ -71,6 +71,16 @@ public abstract class ParseNode {
return finder.hasSubquery;
}
+ public boolean hasJsonExpression() {
+ JsonFunctionFinder finder = new JsonFunctionFinder();
+ try {
+ this.accept(finder);
+ } catch (SQLException e) {
+ // Not possible.
+ }
+ return finder.hasJsonFunction;
+ }
+
public abstract void toSQL(ColumnResolver resolver, StringBuilder buf);
private static class SubqueryFinder extends
StatelessTraverseAllParseNodeVisitor {
@@ -82,4 +92,17 @@ public abstract class ParseNode {
return null;
}
}
+
+ private static class JsonFunctionFinder extends
StatelessTraverseAllParseNodeVisitor {
+ private boolean hasJsonFunction = false;
+ @Override
+ public boolean visitEnter(FunctionParseNode node) throws SQLException {
+ if (node instanceof JsonValueParseNode
+ || node instanceof JsonQueryParseNode
+ || node instanceof JsonModifyParseNode) {
+ hasJsonFunction = true;
+ }
+ return true;
+ }
+ }
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java
index e437fb5fe2..748d5b61e9 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java
@@ -119,7 +119,7 @@ public class PJson extends PVarbinary {
@Override
public boolean isBytesComparableWith(@SuppressWarnings("rawtypes")
PDataType otherType) {
- return otherType == PVarbinary.INSTANCE;
+ return otherType == PVarbinary.INSTANCE || otherType == PJson.INSTANCE;
}
@Override
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
index 04c1070fe9..05458cbdde 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
@@ -169,10 +169,16 @@ public class JsonFunctionsIT extends
ParallelStatsDisabledIT {
assertEquals("[\"Sport\", \"alto1\", \"Books\"]", rs.getString(4));
assertEquals("{\"type\": 1, \"address\": {\"town\":
\"Manchester\", \"county\": \"Avon\", \"country\": \"England\", \"exists\":
true}, \"tags\": [\"Sport\", \"alto1\", \"Books\"]}", rs.getString(5));
- // Now check for empty match
- query = String.format(queryTemplate, "Windsors");
+ upsert ="UPSERT INTO " + tableName + " (pk, col, jsoncol)
VALUES(2,1, JSON_MODIFY(jsoncol, '$.info.address.town', '\"Manchester\"')" +
+ ") ON DUPLICATE KEY IGNORE";
+ conn.createStatement().execute(upsert);
+
+ query = "SELECT pk, col, jsoncol FROM " + tableName + " WHERE pk =
2";
rs = conn.createStatement().executeQuery(query);
- assertFalse(rs.next());
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals(1, rs.getInt(2));
+ assertEquals(null, rs.getString(3));
}
}
@@ -225,10 +231,16 @@ public class JsonFunctionsIT extends
ParallelStatsDisabledIT {
assertEquals("[\"Sport\", \"alto1\", \"Books\"]", rs.getString(4));
assertEquals("{\"type\": 1, \"address\": {\"town\":
\"Manchester\", \"county\": \"Avon\", \"country\": \"England\", \"exists\":
true}, \"tags\": [\"Sport\", \"alto1\", \"Books\"]}", rs.getString(5));
- // Now check for empty match
- query = String.format(queryTemplate, "Windsors");
+ upsert ="UPSERT INTO " + tableName + " (pk, col, jsoncol)
VALUES(2,1, JSON_MODIFY(jsoncol, '$.info.address.town', '\"Manchester\"')" +
+ ") ON DUPLICATE KEY IGNORE";
+ conn.createStatement().execute(upsert);
+ conn.commit();
+ query = "SELECT pk, col, jsoncol FROM " + tableName + " WHERE pk =
2";
rs = conn.createStatement().executeQuery(query);
- assertFalse(rs.next());
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals(1, rs.getInt(2));
+ assertEquals(null, rs.getString(3));
}
}
@@ -749,4 +761,280 @@ public class JsonFunctionsIT extends
ParallelStatsDisabledIT {
assertFalse(rs.next());
}
}
+
+ @Test
+ public void testUpsertJsonModifyWithAutoCommit() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String ddl = "create table " + tableName +
+ " (pk integer primary key, " +
+ "col integer, " +
+ "strcol varchar, " +
+ "strcol1 varchar, " +
+ "strcol2 varchar, " +
+ "strcol3 varchar, " +
+ "strcol4 varchar, " +
+ "strcol5 varchar, " +
+ "jsoncol json)";
+ conn.createStatement().execute(ddl);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " +
tableName + " (pk,col,strcol,jsoncol) VALUES (?,?,?,?)");
+ stmt.setInt(1, 1);
+ stmt.setInt(2, 2);
+ stmt.setString(3, "");
+ stmt.setString(4, basicJson);
+ stmt.execute();
+ String upsert = "UPSERT INTO " + tableName +
"(pk,col,strcol,jsoncol) VALUES(1,2,JSON_VALUE(jsoncol,
'$.info.address.town'),JSON_MODIFY(jsoncol, '$.info.address.town',
'\"Manchester\"')) ";
+ conn.createStatement().execute(upsert);
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,col,strcol1,jsoncol)
VALUES(1,2,JSON_VALUE(jsoncol, '$.info.tags[1]'),JSON_MODIFY(jsoncol,
'$.info.tags[1]', '\"alto1\"')) ");
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,strcol2,jsoncol,col)
VALUES(1,JSON_VALUE(jsoncol, '$.type'),JSON_MODIFY(jsoncol, '$.info.tags',
'[\"Sport\", \"alto1\", \"Books\"]'),3) ");
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,col,strcol3,jsoncol)
SELECT pk, col, JSON_VALUE(jsoncol, '$.info.tags[2]') ,JSON_MODIFY(jsoncol,
'$.info.tags[2]', '\"UpsertSelectVal\"') from "
+ + tableName + " WHERE pk = 1");
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName +
"(pk,col,strcol4,strcol5,jsoncol) SELECT pk, col, JSON_VALUE(jsoncol,
'$.info.tags[2]'),JSON_VALUE(jsoncol, '$.info.tags[2]'),JSON_MODIFY(jsoncol,
'$.info.tags[2]', '\"UpsertSelectVal2\"') from "
+ + tableName + " WHERE pk = 1");
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,col,strcol1,jsoncol)
VALUES(2,1,'Hello',JSON_MODIFY(jsoncol, '$.info.address.town',
'\"Manchester\"')) ");
+ String
+ queryTemplate =
+ "SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol,
'$.info.address.town'), " +
+ "JSON_VALUE(jsoncol, '$.info.tags[1]'),
JSON_QUERY(jsoncol, '$.info.tags'), " +
+ "JSON_QUERY(jsoncol, '$.info'), " +
"JSON_VALUE(jsoncol, '$.info.tags[2]'), col, " +
+ "strcol, strcol1, strcol2,strcol3, strcol4,
strcol5 " +
+ " FROM " + tableName + " WHERE JSON_VALUE(jsoncol,
'$.name') = '%s' AND pk = 1";
+ String query = String.format(queryTemplate, "AndersenFamily");
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("Basic", rs.getString(1));
+ assertEquals("Manchester", rs.getString(2));
+ assertEquals("alto1", rs.getString(3));
+ assertEquals("UpsertSelectVal2", rs.getString(6));
+ assertEquals(3, rs.getInt(7));
+ assertEquals("Bristol", rs.getString(8));
+ assertEquals("Water polo", rs.getString(9));
+ assertEquals("Basic", rs.getString(10));
+ assertEquals("Books", rs.getString(11));
+ assertEquals("UpsertSelectVal", rs.getString(12));
+ assertEquals("UpsertSelectVal", rs.getString(13));
+
+ query = "SELECT pk, col, strcol1, jsoncol FROM " + tableName + "
WHERE pk = 2";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(2));
+ assertEquals("Hello", rs.getString(3));
+ assertEquals(null, rs.getString(4));
+
+ upsert =
+ "UPSERT INTO " + tableName + " (pk, col, jsoncol)
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')"
+ ") ON DUPLICATE KEY UPDATE jsoncol = JSON_MODIFY(jsoncol,
'$.info.address.town', '\"ShouldUpdate\"')";
+ conn.createStatement().execute(upsert);
+ query =
+ "SELECT pk, col, JSON_VALUE(jsoncol,
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(2));
+ assertEquals("ShouldUpdate", rs.getString(3));
+
+ upsert =
+ "UPSERT INTO " + tableName + " (pk, col, jsoncol)
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')"
+ ") ON DUPLICATE KEY IGNORE";
+ conn.createStatement().execute(upsert);
+ query =
+ "SELECT pk, col, JSON_VALUE(jsoncol,
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(2));
+ assertEquals("ShouldUpdate", rs.getString(3));
+ }
+ }
+
+ @Test
+ public void testUpsertJsonModifyWithOutAutoCommit() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName +
+ " (pk integer primary key, " +
+ "col integer, " +
+ "strcol varchar, " +
+ "strcol1 varchar, " +
+ "strcol2 varchar, " +
+ "strcol3 varchar, " +
+ "strcol4 varchar, " +
+ "strcol5 varchar, " +
+ "jsoncol json)";
+ conn.createStatement().execute(ddl);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " +
tableName + " (pk,col,strcol,jsoncol) VALUES (?,?,?,?)");
+ stmt.setInt(1, 1);
+ stmt.setInt(2, 2);
+ stmt.setString(3, "");
+ stmt.setString(4, basicJson);
+ stmt.execute();
+ conn.commit();
+ String upsert = "UPSERT INTO " + tableName +
"(pk,col,strcol,jsoncol) VALUES(1,2,JSON_VALUE(jsoncol,
'$.info.address.town'),JSON_MODIFY(jsoncol, '$.info.address.town',
'\"Manchester\"')) ";
+ conn.createStatement().execute(upsert);
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,col,strcol1,jsoncol)
VALUES(1,2,JSON_VALUE(jsoncol, '$.info.tags[1]'),JSON_MODIFY(jsoncol,
'$.info.tags[1]', '\"alto1\"')) ");
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,strcol2,jsoncol,col)
VALUES(1,JSON_VALUE(jsoncol, '$.type'),JSON_MODIFY(jsoncol, '$.info.tags',
'[\"Sport\", \"alto1\", \"Books\"]'),3) ");
+ conn.commit();
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,col,strcol3,jsoncol)
SELECT pk, col, JSON_VALUE(jsoncol, '$.info.tags[2]') ,JSON_MODIFY(jsoncol,
'$.info.tags[2]', '\"UpsertSelectVal\"') from "
+ + tableName + " WHERE pk = 1");
+ conn.commit();
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName +
"(pk,col,strcol4,strcol5,jsoncol) SELECT pk, col, JSON_VALUE(jsoncol,
'$.info.tags[2]'),JSON_VALUE(jsoncol, '$.info.tags[2]'),JSON_MODIFY(jsoncol,
'$.info.tags[2]', '\"UpsertSelectVal2\"') from "
+ + tableName + " WHERE pk = 1");
+ conn.commit();
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,col,strcol1,jsoncol)
VALUES(2,1,'Hello',JSON_MODIFY(jsoncol, '$.info.address.town',
'\"Manchester\"')) ");
+ conn.commit();
+ String
+ queryTemplate =
+ "SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol,
'$.info.address.town'), " +
+ "JSON_VALUE(jsoncol, '$.info.tags[1]'),
JSON_QUERY(jsoncol, '$.info.tags'), " +
+ "JSON_QUERY(jsoncol, '$.info'), " +
"JSON_VALUE(jsoncol, '$.info.tags[2]'), col, " +
+ "strcol, strcol1, strcol2,strcol3, strcol4,
strcol5 " +
+ " FROM " + tableName + " WHERE JSON_VALUE(jsoncol,
'$.name') = '%s' AND pk = 1";
+ String query = String.format(queryTemplate, "AndersenFamily");
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("Basic", rs.getString(1));
+ assertEquals("Manchester", rs.getString(2));
+ assertEquals("alto1", rs.getString(3));
+ assertEquals("UpsertSelectVal2", rs.getString(6));
+ assertEquals(3, rs.getInt(7));
+ assertEquals("Bristol", rs.getString(8));
+ assertEquals("Water polo", rs.getString(9));
+ assertEquals("Basic", rs.getString(10));
+ assertEquals("Books", rs.getString(11));
+ assertEquals("UpsertSelectVal", rs.getString(12));
+ assertEquals("UpsertSelectVal", rs.getString(13));
+
+ query = "SELECT pk, col, strcol1, jsoncol FROM " + tableName + "
WHERE pk = 2";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(2));
+ assertEquals("Hello", rs.getString(3));
+ assertEquals(null, rs.getString(4));
+
+ upsert =
+ "UPSERT INTO " + tableName + " (pk, col, jsoncol)
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')"
+ ") ON DUPLICATE KEY UPDATE jsoncol = JSON_MODIFY(jsoncol,
'$.info.address.town', '\"ShouldUpdate\"')";
+ conn.createStatement().execute(upsert);
+ conn.commit();
+ query =
+ "SELECT pk, col, JSON_VALUE(jsoncol,
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(2));
+ assertEquals("ShouldUpdate", rs.getString(3));
+
+ upsert =
+ "UPSERT INTO " + tableName + " (pk, col, jsoncol)
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')"
+ ") ON DUPLICATE KEY IGNORE";
+ conn.createStatement().execute(upsert);
+ conn.commit();
+ query =
+ "SELECT pk, col, JSON_VALUE(jsoncol,
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(2));
+ assertEquals("ShouldUpdate", rs.getString(3));
+ }
+ }
+
+ @Test
+ public void testUpsertJsonModifyMultipleCF() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName +
+ " (pk integer primary key, " +
+ "col integer, " +
+ "a.strcol varchar, " +
+ "a.strcol1 varchar, " +
+ "b.strcol varchar, " +
+ "b.strcol1 varchar, " +
+ "strcol4 varchar, " +
+ "strcol5 varchar, " +
+ "jsoncol json)";
+ conn.createStatement().execute(ddl);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " +
tableName + " (pk,col,a.strcol,jsoncol) VALUES (?,?,?,?)");
+ stmt.setInt(1, 1);
+ stmt.setInt(2, 2);
+ stmt.setString(3, "");
+ stmt.setString(4, basicJson);
+ stmt.execute();
+ conn.commit();
+ String upsert = "UPSERT INTO " + tableName +
"(pk,col,a.strcol,jsoncol) VALUES(1,2,JSON_VALUE(jsoncol,
'$.info.address.town') || 'City',JSON_MODIFY(jsoncol, '$.info.address.town',
'\"Manchester\"')) ";
+ conn.createStatement().execute(upsert);
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,col,a.strcol1,jsoncol)
VALUES(1,2,JSON_VALUE(jsoncol, '$.info.tags[1]'),JSON_MODIFY(jsoncol,
'$.info.tags[1]', '\"alto1\"')) ");
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,b.strcol,jsoncol,col)
VALUES(1,JSON_VALUE(jsoncol, '$.type'),JSON_MODIFY(jsoncol, '$.info.tags',
'[\"Sport\", \"alto1\", \"Books\"]'),3) ");
+ conn.commit();
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,col,b.strcol1,jsoncol)
SELECT pk, col, JSON_VALUE(jsoncol, '$.info.tags[2]') ,JSON_MODIFY(jsoncol,
'$.info.tags[2]', '\"UpsertSelectVal\"') from "
+ + tableName + " WHERE pk = 1");
+ conn.commit();
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName +
"(pk,col,strcol4,strcol5,jsoncol) SELECT pk, col, JSON_VALUE(jsoncol,
'$.info.tags[2]'),JSON_VALUE(jsoncol, '$.info.tags[2]'),JSON_MODIFY(jsoncol,
'$.info.tags[2]', '\"UpsertSelectVal2\"') from "
+ + tableName + " WHERE pk = 1");
+ conn.commit();
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + "(pk,col,a.strcol1,jsoncol)
VALUES(2,1,'Hello',JSON_MODIFY(jsoncol, '$.info.address.town',
'\"Manchester\"')) ");
+ conn.commit();
+ String
+ queryTemplate =
+ "SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol,
'$.info.address.town'), " +
+ "JSON_VALUE(jsoncol, '$.info.tags[1]'),
JSON_QUERY(jsoncol, '$.info.tags'), " +
+ "JSON_QUERY(jsoncol, '$.info'), " +
"JSON_VALUE(jsoncol, '$.info.tags[2]'), col, " +
+ "a.strcol, a.strcol1, b.strcol,b.strcol1, strcol4,
strcol5 " +
+ " FROM " + tableName + " WHERE JSON_VALUE(jsoncol,
'$.name') = '%s' AND pk = 1";
+ String query = String.format(queryTemplate, "AndersenFamily");
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("Basic", rs.getString(1));
+ assertEquals("Manchester", rs.getString(2));
+ assertEquals("alto1", rs.getString(3));
+ assertEquals("UpsertSelectVal2", rs.getString(6));
+ assertEquals(3, rs.getInt(7));
+ assertEquals("BristolCity", rs.getString(8));
+ assertEquals("Water polo", rs.getString(9));
+ assertEquals("Basic", rs.getString(10));
+ assertEquals("Books", rs.getString(11));
+ assertEquals("UpsertSelectVal", rs.getString(12));
+ assertEquals("UpsertSelectVal", rs.getString(13));
+
+ query = "SELECT pk, col, a.strcol1, jsoncol FROM " + tableName + "
WHERE pk = 2";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(2));
+ assertEquals("Hello", rs.getString(3));
+ assertEquals(null, rs.getString(4));
+
+ upsert =
+ "UPSERT INTO " + tableName + " (pk, col, jsoncol)
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')"
+ ") ON DUPLICATE KEY UPDATE jsoncol = JSON_MODIFY(jsoncol,
'$.info.address.town', '\"ShouldUpdate\"')";
+ conn.createStatement().execute(upsert);
+ conn.commit();
+ query =
+ "SELECT pk, col, JSON_VALUE(jsoncol,
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(2));
+ assertEquals("ShouldUpdate", rs.getString(3));
+
+ upsert =
+ "UPSERT INTO " + tableName + " (pk, col, jsoncol)
VALUES(1,4, JSON_MODIFY(jsoncol, '$.info.address.town', '\"ShouldBeIgnore\"')"
+ ") ON DUPLICATE KEY IGNORE";
+ conn.createStatement().execute(upsert);
+ conn.commit();
+ query =
+ "SELECT pk, col, JSON_VALUE(jsoncol,
'$.info.address.town') FROM " + tableName + " WHERE pk = 1";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(2));
+ assertEquals("ShouldUpdate", rs.getString(3));
+ }
+ }
}