This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new b094e82ad0 PHOENIX-7641 Support placeholder for document field keys in BSON condition expression (#2193) b094e82ad0 is described below commit b094e82ad0c0b4d7f8793d3c426986ebf413c916 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Tue Jun 17 19:22:44 2025 -0700 PHOENIX-7641 Support placeholder for document field keys in BSON condition expression (#2193) --- .../src/main/antlr3/PhoenixBsonExpression.g | 47 +- .../function/BsonConditionExpressionFunction.java | 4 +- .../util/bson/SQLComparisonExpressionUtils.java | 122 +++- .../java/org/apache/phoenix/end2end/Bson5IT.java | 709 +++++++++++++++++++++ .../util/bson/ComparisonExpressionUtils2Test.java | 318 +++++++++ .../util/bson/ComparisonExpressionUtilsTest.java | 7 + 6 files changed, 1180 insertions(+), 27 deletions(-) diff --git a/phoenix-core-client/src/main/antlr3/PhoenixBsonExpression.g b/phoenix-core-client/src/main/antlr3/PhoenixBsonExpression.g index 03ad3e53ea..537b4c4259 100644 --- a/phoenix-core-client/src/main/antlr3/PhoenixBsonExpression.g +++ b/phoenix-core-client/src/main/antlr3/PhoenixBsonExpression.g @@ -211,10 +211,8 @@ and_expression returns [ParseNode ret] not_expression returns [ParseNode ret] : (NOT? boolean_expression ) => n=NOT? e=boolean_expression { $ret = n == null ? e : factory.not(e); } | n=NOT? LPAREN e=expression RPAREN { $ret = n == null ? e : factory.not(e); } - | (ATTR | FIELD) ( LPAREN t=DOCUMENT_FIELD RPAREN {$ret = factory.documentFieldExists( - factory.literal(t.getText()), true); } ) - | (ATTR_NOT | FIELD_NOT) ( LPAREN t=DOCUMENT_FIELD RPAREN {$ret = factory.documentFieldExists( - factory.literal(t.getText()), false); } ) + | (ATTR | FIELD) ( LPAREN t=literal RPAREN {$ret = factory.documentFieldExists(t, true); } ) + | (ATTR_NOT | FIELD_NOT) ( LPAREN t=literal RPAREN {$ret = factory.documentFieldExists(t, false); } ) ; comparison_op returns [CompareOperator ret] @@ -280,6 +278,9 @@ literal returns [LiteralParseNode ret] | i=DOCUMENT_FIELD { ret = factory.literal(i.getText()); } + | j=HASH_DOCUMENT_FIELD { + ret = factory.literal(j.getText()); + } | s=STRING_LITERAL { ret = factory.literal(s.getText()); } @@ -410,22 +411,48 @@ DIVIDE : '/' ; +DOT + : '.' + ; + +PERCENT + : '%' + ; + +UNDERSCORE + : '_' + ; + +HASH + : '#' + ; + + BIND_VALUE @init{ StringBuilder sb = new StringBuilder(); } - : ( COLON { sb.append(":"); } | '$' { sb.append("$"); } | '#' { sb.append("#"); } ) + : ( COLON { sb.append(":"); } | '$' { sb.append("$"); } ) ( t=BIND_VALUE_CHARS { sb.append(t.getText()); } )+ { setText(sb.toString()); } ; +HASH_DOCUMENT_FIELD +@init{ StringBuilder sb = new StringBuilder(); } + : ( a=HASH { sb.append(a.getText()); } ) + ( h=HASH { sb.append(h.getText()); } + | t=DOCUMENT_FIELD { sb.append(t.getText()); } + )+ + { setText(sb.toString()); } + ; + DOCUMENT_FIELD @init{ StringBuilder sb = new StringBuilder(); } : (v1=LETTER { sb.append(v1.getText()); } | v2=DIGIT { sb.append(v2.getText()); } - | '_' { sb.append("_"); } - | '-' { sb.append("-"); } - | LSQUARE { sb.append("["); } - | RSQUARE { sb.append("]"); } - | '.' { sb.append("."); } + | v3=UNDERSCORE { sb.append(v3.getText()); } + | v4=LSQUARE { sb.append(v4.getText()); } + | v5=RSQUARE { sb.append(v5.getText()); } + | v6=DOT { sb.append(v6.getText()); } + | v8=MINUS { sb.append(v8.getText()); } )+ { setText(sb.toString()); } ; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/BsonConditionExpressionFunction.java b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/BsonConditionExpressionFunction.java index 5c579e9507..91dc692e1c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/BsonConditionExpressionFunction.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/BsonConditionExpressionFunction.java @@ -102,13 +102,15 @@ public class BsonConditionExpressionFunction extends ScalarFunction { BsonValue conditionExp = conditionExpressionBsonDoc.get("$EXPR"); BsonValue exprValues = conditionExpressionBsonDoc.get("$VAL"); + BsonValue keyAlias = conditionExpressionBsonDoc.get("$KEYS"); if (conditionExp != null && exprValues != null) { if (conditionExp.isString() && exprValues.isDocument()) { boolean result = SQLComparisonExpressionUtils .evaluateConditionExpression( ((BsonString) conditionExp).getValue(), rawBsonDocument, - (BsonDocument) exprValues); + (BsonDocument) exprValues, + keyAlias == null ? null : (BsonDocument) keyAlias); ptr.set(PBoolean.INSTANCE.toBytes(result)); return true; } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/util/bson/SQLComparisonExpressionUtils.java b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/util/bson/SQLComparisonExpressionUtils.java index a32631a4aa..c4faa88275 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/util/bson/SQLComparisonExpressionUtils.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/util/bson/SQLComparisonExpressionUtils.java @@ -34,12 +34,15 @@ import org.apache.phoenix.parse.NotParseNode; import org.apache.phoenix.parse.OrParseNode; import org.apache.phoenix.parse.ParseNode; import org.bson.BsonDocument; +import org.bson.BsonString; import org.bson.BsonValue; import org.bson.RawBsonDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -70,7 +73,30 @@ public final class SQLComparisonExpressionUtils { + "conditionExpression: {}", rawBsonDocument, conditionExpression); return false; } - return evaluateExpression(conditionExpression, rawBsonDocument, comparisonValuesDocument); + return evaluateExpression(conditionExpression, rawBsonDocument, comparisonValuesDocument, null); + } + + /** + * Evaluate the given condition expression on the BSON Document. + * + * @param conditionExpression The condition expression consisting of operands, operators. + * @param rawBsonDocument The BSON Document on which the condition expression is evaluated. + * @param comparisonValuesDocument The BSON Document consisting of place-holder key-value pairs. + * @param keyAliasDocument The BSON Document consisting of place-holder for keys. + * @return True if the evaluation is successful, False otherwise. + */ + public static boolean evaluateConditionExpression(final String conditionExpression, + final RawBsonDocument rawBsonDocument, + final BsonDocument comparisonValuesDocument, + final BsonDocument keyAliasDocument) { + if (rawBsonDocument == null || conditionExpression == null) { + LOGGER.warn( + "Document and/or Condition Expression document are empty. Document: {}, " + + "conditionExpression: {}", rawBsonDocument, conditionExpression); + return false; + } + return evaluateExpression(conditionExpression, rawBsonDocument, comparisonValuesDocument, + keyAliasDocument); } /** @@ -79,11 +105,13 @@ public final class SQLComparisonExpressionUtils { * @param conditionExpression The condition expression consisting of operands, operators. * @param rawBsonDocument The BSON Document on which the condition expression is evaluated. * @param comparisonValuesDocument The BSON Document consisting of place-holder key-value pairs. + * @param keyAliasDocument The BSON Document consisting of place-holder for keys. * @return True if the evaluation is successful, False otherwise. */ private static boolean evaluateExpression(final String conditionExpression, final RawBsonDocument rawBsonDocument, - final BsonDocument comparisonValuesDocument) { + final BsonDocument comparisonValuesDocument, + final BsonDocument keyAliasDocument) { BsonExpressionParser bsonExpressionParser = new BsonExpressionParser(conditionExpression); ParseNode parseNode; try { @@ -92,7 +120,15 @@ public final class SQLComparisonExpressionUtils { LOGGER.error("Expression {} could not be evaluated.", conditionExpression, e); throw new RuntimeException("Expression could not be evaluated: " + conditionExpression, e); } - return evaluateExpression(parseNode, rawBsonDocument, comparisonValuesDocument); + List<String> sortedKeyNames; + if (keyAliasDocument == null || keyAliasDocument.isEmpty()) { + sortedKeyNames = Collections.emptyList(); + } else { + sortedKeyNames = new ArrayList<>(keyAliasDocument.keySet()); + sortedKeyNames.sort((a, b) -> Integer.compare(b.length(), a.length())); + } + return evaluateExpression(parseNode, rawBsonDocument, comparisonValuesDocument, + keyAliasDocument, sortedKeyNames); } /** @@ -101,11 +137,17 @@ public final class SQLComparisonExpressionUtils { * @param parseNode The root ParseNode of the parse tree. * @param rawBsonDocument BSON Document value of the Cell. * @param comparisonValuesDocument BSON Document with place-holder values. + * @param keyAliasDocument The BSON Document consisting of place-holder for keys. + * @param sortedKeyNames The document key names in the descending sorted order of their string + * length. * @return True if the evaluation is successful, False otherwise. */ private static boolean evaluateExpression(final ParseNode parseNode, final RawBsonDocument rawBsonDocument, - final BsonDocument comparisonValuesDocument) { + final BsonDocument comparisonValuesDocument, + final BsonDocument keyAliasDocument, + final List<String> sortedKeyNames) { + // In Phoenix, usually every ParseNode has corresponding Expression class. The expression // is evaluated on the Tuple. However, the expression requires data type of PDataType instance. // This case is different: we need to evaluate the parse node on the document, not on Tuple. @@ -119,27 +161,31 @@ public final class SQLComparisonExpressionUtils { (DocumentFieldExistsParseNode) parseNode; final LiteralParseNode fieldKey = (LiteralParseNode) documentFieldExistsParseNode.getChildren().get(0); - final String fieldName = (String) fieldKey.getValue(); + String fieldName = (String) fieldKey.getValue(); + fieldName = replaceExpressionFieldNames(fieldName, keyAliasDocument, sortedKeyNames); return documentFieldExistsParseNode.isExists() == exists(fieldName, rawBsonDocument); } else if (parseNode instanceof EqualParseNode) { final EqualParseNode equalParseNode = (EqualParseNode) parseNode; final LiteralParseNode lhs = (LiteralParseNode) equalParseNode.getLHS(); final LiteralParseNode rhs = (LiteralParseNode) equalParseNode.getRHS(); - final String fieldKey = (String) lhs.getValue(); + String fieldKey = (String) lhs.getValue(); + fieldKey = replaceExpressionFieldNames(fieldKey, keyAliasDocument, sortedKeyNames); final String expectedFieldValue = (String) rhs.getValue(); return isEquals(fieldKey, expectedFieldValue, rawBsonDocument, comparisonValuesDocument); } else if (parseNode instanceof NotEqualParseNode) { final NotEqualParseNode notEqualParseNode = (NotEqualParseNode) parseNode; final LiteralParseNode lhs = (LiteralParseNode) notEqualParseNode.getLHS(); final LiteralParseNode rhs = (LiteralParseNode) notEqualParseNode.getRHS(); - final String fieldKey = (String) lhs.getValue(); + String fieldKey = (String) lhs.getValue(); + fieldKey = replaceExpressionFieldNames(fieldKey, keyAliasDocument, sortedKeyNames); final String expectedFieldValue = (String) rhs.getValue(); return !isEquals(fieldKey, expectedFieldValue, rawBsonDocument, comparisonValuesDocument); } else if (parseNode instanceof LessThanParseNode) { final LessThanParseNode lessThanParseNode = (LessThanParseNode) parseNode; final LiteralParseNode lhs = (LiteralParseNode) lessThanParseNode.getLHS(); final LiteralParseNode rhs = (LiteralParseNode) lessThanParseNode.getRHS(); - final String fieldKey = (String) lhs.getValue(); + String fieldKey = (String) lhs.getValue(); + fieldKey = replaceExpressionFieldNames(fieldKey, keyAliasDocument, sortedKeyNames); final String expectedFieldValue = (String) rhs.getValue(); return lessThan(fieldKey, expectedFieldValue, rawBsonDocument, comparisonValuesDocument); } else if (parseNode instanceof LessThanOrEqualParseNode) { @@ -147,7 +193,8 @@ public final class SQLComparisonExpressionUtils { (LessThanOrEqualParseNode) parseNode; final LiteralParseNode lhs = (LiteralParseNode) lessThanOrEqualParseNode.getLHS(); final LiteralParseNode rhs = (LiteralParseNode) lessThanOrEqualParseNode.getRHS(); - final String fieldKey = (String) lhs.getValue(); + String fieldKey = (String) lhs.getValue(); + fieldKey = replaceExpressionFieldNames(fieldKey, keyAliasDocument, sortedKeyNames); final String expectedFieldValue = (String) rhs.getValue(); return lessThanOrEquals(fieldKey, expectedFieldValue, rawBsonDocument, comparisonValuesDocument); @@ -156,7 +203,8 @@ public final class SQLComparisonExpressionUtils { (GreaterThanParseNode) parseNode; final LiteralParseNode lhs = (LiteralParseNode) greaterThanParseNode.getLHS(); final LiteralParseNode rhs = (LiteralParseNode) greaterThanParseNode.getRHS(); - final String fieldKey = (String) lhs.getValue(); + String fieldKey = (String) lhs.getValue(); + fieldKey = replaceExpressionFieldNames(fieldKey, keyAliasDocument, sortedKeyNames); final String expectedFieldValue = (String) rhs.getValue(); return greaterThan(fieldKey, expectedFieldValue, rawBsonDocument, comparisonValuesDocument); } else if (parseNode instanceof GreaterThanOrEqualParseNode) { @@ -164,7 +212,8 @@ public final class SQLComparisonExpressionUtils { (GreaterThanOrEqualParseNode) parseNode; final LiteralParseNode lhs = (LiteralParseNode) greaterThanOrEqualParseNode.getLHS(); final LiteralParseNode rhs = (LiteralParseNode) greaterThanOrEqualParseNode.getRHS(); - final String fieldKey = (String) lhs.getValue(); + String fieldKey = (String) lhs.getValue(); + fieldKey = replaceExpressionFieldNames(fieldKey, keyAliasDocument, sortedKeyNames); final String expectedFieldValue = (String) rhs.getValue(); return greaterThanOrEquals(fieldKey, expectedFieldValue, rawBsonDocument, comparisonValuesDocument); @@ -174,7 +223,8 @@ public final class SQLComparisonExpressionUtils { (LiteralParseNode) betweenParseNode.getChildren().get(0); final LiteralParseNode lhs = (LiteralParseNode) betweenParseNode.getChildren().get(1); final LiteralParseNode rhs = (LiteralParseNode) betweenParseNode.getChildren().get(2); - final String fieldName = (String) fieldKey.getValue(); + String fieldName = (String) fieldKey.getValue(); + fieldName = replaceExpressionFieldNames(fieldName, keyAliasDocument, sortedKeyNames); final String expectedFieldValue1 = (String) lhs.getValue(); final String expectedFieldValue2 = (String) rhs.getValue(); return betweenParseNode.isNegate() != @@ -184,7 +234,8 @@ public final class SQLComparisonExpressionUtils { final InListParseNode inListParseNode = (InListParseNode) parseNode; final List<ParseNode> childrenNodes = inListParseNode.getChildren(); final LiteralParseNode fieldKey = (LiteralParseNode) childrenNodes.get(0); - final String fieldName = (String) fieldKey.getValue(); + String fieldName = (String) fieldKey.getValue(); + fieldName = replaceExpressionFieldNames(fieldName, keyAliasDocument, sortedKeyNames); final String[] inList = new String[childrenNodes.size() - 1]; for (int i = 1; i < childrenNodes.size(); i++) { LiteralParseNode literalParseNode = (LiteralParseNode) childrenNodes.get(i); @@ -196,7 +247,8 @@ public final class SQLComparisonExpressionUtils { AndParseNode andParseNode = (AndParseNode) parseNode; List<ParseNode> children = andParseNode.getChildren(); for (ParseNode node : children) { - if (!evaluateExpression(node, rawBsonDocument, comparisonValuesDocument)) { + if (!evaluateExpression(node, rawBsonDocument, comparisonValuesDocument, + keyAliasDocument, sortedKeyNames)) { return false; } } @@ -205,7 +257,8 @@ public final class SQLComparisonExpressionUtils { OrParseNode orParseNode = (OrParseNode) parseNode; List<ParseNode> children = orParseNode.getChildren(); for (ParseNode node : children) { - if (evaluateExpression(node, rawBsonDocument, comparisonValuesDocument)) { + if (evaluateExpression(node, rawBsonDocument, comparisonValuesDocument, keyAliasDocument, + sortedKeyNames)) { return true; } } @@ -213,13 +266,50 @@ public final class SQLComparisonExpressionUtils { } else if (parseNode instanceof NotParseNode) { NotParseNode notParseNode = (NotParseNode) parseNode; return !evaluateExpression(notParseNode.getChildren().get(0), rawBsonDocument, - comparisonValuesDocument); + comparisonValuesDocument, keyAliasDocument, sortedKeyNames); } else { throw new IllegalArgumentException("ParseNode " + parseNode + " is not recognized for " + "document comparison"); } } + /** + * Replaces expression field names with their corresponding actual field names. + * This method supports field name aliasing by replacing placeholder expressions + * with actual field names from the provided key alias document. + * <p>Expression field names allow users to reference field names using placeholder + * syntax, which is useful for:</p> + * <ul> + * <li>Avoiding conflicts with reserved words</li> + * <li>Handling field names with special characters</li> + * <li>Improving readability of complex expressions</li> + * <li>Supporting dynamic field name substitution</li> + * </ul> + * + * @param fieldKey the field key expression that may contain expression field names. + * @param keyAliasDocument the BSON document containing mappings from expression + * field names to actual field names. Each key should be + * an expression field name and each value should be a BsonString + * containing the actual field name. + * @param sortedKeys the list of expression field names sorted by length in + * descending order. + * @return the field key with all expression field names replaced by their + * corresponding actual field names. + */ + private static String replaceExpressionFieldNames(String fieldKey, + BsonDocument keyAliasDocument, + List<String> sortedKeys) { + String tmpFieldKey = fieldKey; + for (String expressionAttributeName : sortedKeys) { + if (tmpFieldKey.contains(expressionAttributeName)) { + String actualFieldName = + ((BsonString) keyAliasDocument.get(expressionAttributeName)).getValue(); + tmpFieldKey = tmpFieldKey.replace(expressionAttributeName, actualFieldName); + } + } + return tmpFieldKey; + } + /** * Returns true if the value of the field is comparable to the value represented by * {@code expectedFieldValue} as per the comparison operator represented by {@code compareOp}. diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java new file mode 100644 index 0000000000..350da23eae --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java @@ -0,0 +1,709 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.end2end; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.types.PDouble; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.util.CDCUtil; +import org.apache.phoenix.util.PropertiesUtil; +import org.bson.BsonArray; +import org.bson.BsonBinary; +import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonNull; +import org.bson.BsonString; +import org.bson.RawBsonDocument; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Base64; +import java.util.Map; +import java.util.Properties; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for BSON with expression field key alias. + */ +@Category(ParallelStatsDisabledTest.class) +public class Bson5IT extends ParallelStatsDisabledIT { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static String getJsonString(String jsonFilePath) throws IOException { + URL fileUrl = Bson5IT.class.getClassLoader().getResource(jsonFilePath); + Preconditions.checkArgument(fileUrl != null, "File path " + jsonFilePath + " seems invalid"); + return FileUtils.readFileToString(new File(fileUrl.getFile()), Charset.defaultCharset()); + } + + /** + * Conditional Upserts for BSON pass where the Condition Expression is of SQL style. + */ + @Test + public void testBsonOpsWithSqlConditionsUpdateSuccess() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String tableName = generateUniqueName(); + String cdcName = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String ddl = "CREATE TABLE " + tableName + + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON" + + " CONSTRAINT pk PRIMARY KEY(PK1))"; + String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName; + conn.createStatement().execute(ddl); + conn.createStatement().execute(cdcDdl); + IndexToolIT.runIndexTool(false, "", tableName, + "\"" + CDCUtil.getCDCIndexName(cdcName) + "\""); + Timestamp ts1 = new Timestamp(System.currentTimeMillis()); + Thread.sleep(100); + + String sample1 = getJsonString("json/sample_01.json"); + String sample2 = getJsonString("json/sample_02.json"); + String sample3 = getJsonString("json/sample_03.json"); + BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1); + BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2); + BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3); + + PreparedStatement stmt = + conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)"); + stmt.setString(1, "pk0001"); + stmt.setString(2, "0002"); + stmt.setObject(3, bsonDocument1); + stmt.executeUpdate(); + + stmt.setString(1, "pk1010"); + stmt.setString(2, "1010"); + stmt.setObject(3, bsonDocument2); + stmt.executeUpdate(); + + stmt.setString(1, "pk1011"); + stmt.setString(2, "1011"); + stmt.setObject(3, bsonDocument3); + stmt.executeUpdate(); + + conn.commit(); + Thread.sleep(100); + Timestamp ts2 = new Timestamp(System.currentTimeMillis()); + + testCDCAfterFirstUpsert(conn, cdcName, ts1, ts2, bsonDocument1, bsonDocument2, bsonDocument3); + + ts1 = new Timestamp(System.currentTimeMillis()); + Thread.sleep(100); + + String conditionExpression = + "#press = :press AND #track[0].#shot[2][0].#city.#standard[50] = :softly"; + + //{ + // "$EXPR": "#press = :press AND #track[0].#shot[2][0].#city.#standard[50] = :softly", + // "$VAL": { + // ":press": "beat", + // ":softly": "softly" + // }, + // "$KEYS": { + // "#press": "press", + // "#track": "track", + // "#shot": "shot", + // "#city": "city", + // "#standard": "standard" + // } + //} + BsonDocument conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument() + .append(":press", new BsonString("beat")) + .append(":softly", new BsonString("softly"))); + conditionDoc.put("$KEYS", new BsonDocument() + .append("#press", new BsonString("press")) + .append("#track", new BsonString("track")) + .append("#shot", new BsonString("shot")) + .append("#city", new BsonString("city")) + .append("#standard", new BsonString("standard"))); + + String query = "SELECT * FROM " + tableName + + " WHERE PK1 = 'pk0001' AND C1 = '0002' AND NOT BSON_CONDITION_EXPRESSION(COL, '" + + conditionDoc.toJson() + "')"; + ResultSet rs = conn.createStatement().executeQuery(query); + + assertTrue(rs.next()); + assertEquals("pk0001", rs.getString(1)); + assertEquals("0002", rs.getString(2)); + BsonDocument document1 = (BsonDocument) rs.getObject(3); + assertEquals(bsonDocument1, document1); + + assertFalse(rs.next()); + + conditionExpression = + "#press = :press AND #track[0].#shot[2][0].#city.#standard[5] = :softly"; + + conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument() + .append(":press", new BsonString("beat")) + .append(":softly", new BsonString("softly"))); + conditionDoc.put("$KEYS", new BsonDocument() + .append("#press", new BsonString("press")) + .append("#track", new BsonString("track")) + .append("#shot", new BsonString("shot")) + .append("#city", new BsonString("city")) + .append("#standard", new BsonString("standard"))); + + query = "SELECT * FROM " + tableName + + " WHERE PK1 = ? AND C1 = ? AND BSON_CONDITION_EXPRESSION(COL, ?)"; + PreparedStatement ps = conn.prepareStatement(query); + ps.setString(1, "pk0001"); + ps.setString(2, "0002"); + ps.setObject(3, conditionDoc); + + rs = ps.executeQuery(); + + assertTrue(rs.next()); + assertEquals("pk0001", rs.getString(1)); + assertEquals("0002", rs.getString(2)); + document1 = (BsonDocument) rs.getObject(3); + assertEquals(bsonDocument1, document1); + + assertFalse(rs.next()); + + BsonDocument updateExp = new BsonDocument() + .append("$SET", new BsonDocument() + .append("browserling", + new BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095))) + .append("track[0].shot[2][0].city.standard[5]", new BsonString("soft")) + .append("track[0].shot[2][0].city.problem[2]", + new BsonString("track[0].shot[2][0].city.problem[2] + 529.435"))) + .append("$UNSET", new BsonDocument() + .append("track[0].shot[2][0].city.flame", new BsonNull())); + + stmt = conn.prepareStatement("UPSERT INTO " + tableName + + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END," + + " C1 = ?"); + stmt.setString(1, "pk0001"); + stmt.setString(2, "0003"); + stmt.executeUpdate(); + + updateExp = new BsonDocument() + .append("$ADD", new BsonDocument() + .append("new_samples", + new BsonDocument().append("$set", + new BsonArray(Arrays.asList( + new BsonBinary(Bytes.toBytes("Sample10")), + new BsonBinary(Bytes.toBytes("Sample12")), + new BsonBinary(Bytes.toBytes("Sample13")), + new BsonBinary(Bytes.toBytes("Sample14")) + ))))) + .append("$DELETE_FROM_SET", new BsonDocument() + .append("new_samples", + new BsonDocument().append("$set", + new BsonArray(Arrays.asList( + new BsonBinary(Bytes.toBytes("Sample02")), + new BsonBinary(Bytes.toBytes("Sample03")) + ))))) + .append("$SET", new BsonDocument() + .append("newrecord", ((BsonArray) (document1.get("track"))).get(0))) + .append("$UNSET", new BsonDocument() + .append("rather[3].outline.halfway.so[2][2]", new BsonNull())); + + conditionExpression = + "field_not_exists(newrecord) AND field_exists(#rather[3].#outline.#halfway.#so[2][2])"; + + conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument()); + conditionDoc.put("$KEYS", new BsonDocument() + .append("#rather", new BsonString("rather")) + .append("#outline", new BsonString("outline")) + .append("#halfway", new BsonString("halfway")) + .append("#so", new BsonString("so"))); + + stmt = conn.prepareStatement("UPSERT INTO " + tableName + + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + stmt.setString(1, "pk1010"); + stmt.executeUpdate(); + + updateExp = new BsonDocument() + .append("$SET", new BsonDocument() + .append("result[1].location.state", new BsonString("AK"))) + .append("$UNSET", new BsonDocument() + .append("result[4].emails[1]", new BsonNull())); + + conditionExpression = + "#result[2].#location.#coordinates.#latitude > :latitude OR " + + "(field_exists(#result[1].#location) AND #result[1].#location.#state != :state" + + " AND field_exists(#result[4].#emails[1]))"; + + conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument() + .append(":latitude", new BsonDouble(0)) + .append(":state", new BsonString("AK"))); + conditionDoc.put("$KEYS", new BsonDocument() + .append("#result", new BsonString("result")) + .append("#location", new BsonString("location")) + .append("#coordinates", new BsonString("coordinates")) + .append("#latitude", new BsonString("latitude")) + .append("#state", new BsonString("state")) + .append("#emails", new BsonString("emails"))); + + stmt = conn.prepareStatement("UPSERT INTO " + tableName + + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + stmt.setString(1, "pk1011"); + stmt.executeUpdate(); + + conn.commit(); + Thread.sleep(100); + ts2 = new Timestamp(System.currentTimeMillis()); + + testCDCPostUpdate(conn, cdcName, ts1, ts2, bsonDocument1, bsonDocument2, bsonDocument3); + + query = "SELECT * FROM " + tableName; + rs = conn.createStatement().executeQuery(query); + + assertTrue(rs.next()); + assertEquals("pk0001", rs.getString(1)); + assertEquals("0003", rs.getString(2)); + document1 = (BsonDocument) rs.getObject(3); + + String updatedJson = getJsonString("json/sample_updated_01.json"); + assertEquals(RawBsonDocument.parse(updatedJson), document1); + + assertTrue(rs.next()); + assertEquals("pk1010", rs.getString(1)); + assertEquals("1010", rs.getString(2)); + BsonDocument document2 = (BsonDocument) rs.getObject(3); + + updatedJson = getJsonString("json/sample_updated_02.json"); + assertEquals(RawBsonDocument.parse(updatedJson), document2); + + assertTrue(rs.next()); + assertEquals("pk1011", rs.getString(1)); + assertEquals("1011", rs.getString(2)); + BsonDocument document3 = (BsonDocument) rs.getObject(3); + + updatedJson = getJsonString("json/sample_updated_03.json"); + assertEquals(RawBsonDocument.parse(updatedJson), document3); + + assertFalse(rs.next()); + } + } + + @Test + public void testBsonOpsWithSqlConditionsUpdateFailure() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String tableName = generateUniqueName(); + String cdcName = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String ddl = "CREATE TABLE " + tableName + + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON" + + " CONSTRAINT pk PRIMARY KEY(PK1))"; + String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName; + conn.createStatement().execute(ddl); + conn.createStatement().execute(cdcDdl); + IndexToolIT.runIndexTool(false, "", tableName, + "\"" + CDCUtil.getCDCIndexName(cdcName) + "\""); + Timestamp ts1 = new Timestamp(System.currentTimeMillis()); + Thread.sleep(100); + + String sample1 = getJsonString("json/sample_01.json"); + String sample2 = getJsonString("json/sample_02.json"); + String sample3 = getJsonString("json/sample_03.json"); + BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1); + BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2); + BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3); + + PreparedStatement stmt = + conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)"); + stmt.setString(1, "pk0001"); + stmt.setString(2, "0002"); + stmt.setObject(3, bsonDocument1); + stmt.executeUpdate(); + + stmt.setString(1, "pk1010"); + stmt.setString(2, "1010"); + stmt.setObject(3, bsonDocument2); + stmt.executeUpdate(); + + stmt.setString(1, "pk1011"); + stmt.setString(2, "1011"); + stmt.setObject(3, bsonDocument3); + stmt.executeUpdate(); + + conn.commit(); + + Thread.sleep(100); + Timestamp ts2 = new Timestamp(System.currentTimeMillis()); + + testCDCAfterFirstUpsert(conn, cdcName, ts1, ts2, bsonDocument1, bsonDocument2, bsonDocument3); + + Thread.sleep(100); + ts1 = new Timestamp(System.currentTimeMillis()); + Thread.sleep(100); + + String conditionExpression = + "#press = :press AND #track[0].#shot[2][0].#city.#standard[50] = :softly"; + + BsonDocument conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument() + .append(":press", new BsonString("beat")) + .append(":softly", new BsonString("softly"))); + conditionDoc.put("$KEYS", new BsonDocument() + .append("#press", new BsonString("press")) + .append("#track", new BsonString("track")) + .append("#shot", new BsonString("shot")) + .append("#city", new BsonString("city")) + .append("#standard", new BsonString("standard"))); + + String query = "SELECT * FROM " + tableName + + " WHERE PK1 = 'pk0001' AND C1 = '0002' AND NOT BSON_CONDITION_EXPRESSION(COL, '" + + conditionDoc.toJson() + "')"; + ResultSet rs = conn.createStatement().executeQuery(query); + + assertTrue(rs.next()); + assertEquals("pk0001", rs.getString(1)); + assertEquals("0002", rs.getString(2)); + BsonDocument document1 = (BsonDocument) rs.getObject(3); + assertEquals(bsonDocument1, document1); + + assertFalse(rs.next()); + + conditionExpression = + "#press = :press AND #track[0].#shot[2][0].#city.#standard[5] <> :softly"; + + conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument() + .append(":press", new BsonString("beat")) + .append(":softly", new BsonString("softly"))); + conditionDoc.put("$KEYS", new BsonDocument() + .append("#press", new BsonString("press")) + .append("#track", new BsonString("track")) + .append("#shot", new BsonString("shot")) + .append("#city", new BsonString("city")) + .append("#standard", new BsonString("standard"))); + + BsonDocument updateExp = new BsonDocument() + .append("$SET", new BsonDocument() + .append("browserling", + new BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095))) + .append("track[0].shot[2][0].city.standard[5]", new BsonString("soft")) + .append("track[0].shot[2][0].city.problem[2]", + new BsonString("track[0].shot[2][0].city.problem[2] + 529.435"))) + .append("$UNSET", new BsonDocument() + .append("track[0].shot[2][0].city.flame", new BsonNull())); + + stmt = conn.prepareStatement("UPSERT INTO " + tableName + + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END," + + " C1 = ?"); + stmt.setString(1, "pk0001"); + stmt.setString(2, "0003"); + stmt.executeUpdate(); + + updateExp = new BsonDocument() + .append("$ADD", new BsonDocument() + .append("new_samples", + new BsonDocument().append("$set", + new BsonArray(Arrays.asList( + new BsonBinary(Bytes.toBytes("Sample10")), + new BsonBinary(Bytes.toBytes("Sample12")), + new BsonBinary(Bytes.toBytes("Sample13")), + new BsonBinary(Bytes.toBytes("Sample14")) + ))))) + .append("$DELETE_FROM_SET", new BsonDocument() + .append("new_samples", + new BsonDocument().append("$set", + new BsonArray(Arrays.asList( + new BsonBinary(Bytes.toBytes("Sample02")), + new BsonBinary(Bytes.toBytes("Sample03")) + ))))) + .append("$SET", new BsonDocument() + .append("newrecord", ((BsonArray) (document1.get("track"))).get(0))) + .append("$UNSET", new BsonDocument() + .append("rather[3].outline.halfway.so[2][2]", new BsonNull())); + + conditionExpression = + "field_not_exists(newrecord) AND field_exists(#rather[3].#outline.#halfway.#so[2][20])"; + + conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument()); + conditionDoc.put("$KEYS", new BsonDocument() + .append("#rather", new BsonString("rather")) + .append("#outline", new BsonString("outline")) + .append("#halfway", new BsonString("halfway")) + .append("#so", new BsonString("so"))); + + stmt = conn.prepareStatement("UPSERT INTO " + tableName + + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + stmt.setString(1, "pk1010"); + stmt.executeUpdate(); + + updateExp = new BsonDocument() + .append("$SET", new BsonDocument() + .append("result[1].location.state", new BsonString("AK"))) + .append("$UNSET", new BsonDocument() + .append("result[4].emails[1]", new BsonNull())); + + conditionExpression = + "#result[2].#location.#coordinates.#latitude > :latitude OR " + + "(field_exists(#result[1].#location) AND #result[1].#location.#state != :state" + + " AND field_not_exists(#result[4].#emails[1]))"; + + conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument() + .append(":latitude", new BsonDouble(0)) + .append(":state", new BsonString("AK"))); + conditionDoc.put("$KEYS", new BsonDocument() + .append("#result", new BsonString("result")) + .append("#location", new BsonString("location")) + .append("#coordinates", new BsonString("coordinates")) + .append("#latitude", new BsonString("latitude")) + .append("#state", new BsonString("state")) + .append("#emails", new BsonString("emails"))); + + stmt = conn.prepareStatement("UPSERT INTO " + tableName + + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + stmt.setString(1, "pk1011"); + stmt.executeUpdate(); + + conn.commit(); + + Thread.sleep(100); + ts2 = new Timestamp(System.currentTimeMillis()); + + testCDCUpdateOneRowChange(conn, cdcName, ts1, ts2, bsonDocument1); + + query = "SELECT * FROM " + tableName; + rs = conn.createStatement().executeQuery(query); + + assertTrue(rs.next()); + assertEquals("pk0001", rs.getString(1)); + assertEquals("0003", rs.getString(2)); + document1 = (BsonDocument) rs.getObject(3); + + assertEquals(bsonDocument1, document1); + + assertTrue(rs.next()); + assertEquals("pk1010", rs.getString(1)); + assertEquals("1010", rs.getString(2)); + BsonDocument document2 = (BsonDocument) rs.getObject(3); + + assertEquals(bsonDocument2, document2); + + assertTrue(rs.next()); + assertEquals("pk1011", rs.getString(1)); + assertEquals("1011", rs.getString(2)); + BsonDocument document3 = (BsonDocument) rs.getObject(3); + + assertEquals(bsonDocument3, document3); + + assertFalse(rs.next()); + } + } + + private static void testCDCAfterFirstUpsert(Connection conn, String cdcName, Timestamp ts1, + Timestamp ts2, + BsonDocument bsonDocument1, + BsonDocument bsonDocument2, + BsonDocument bsonDocument3) + throws SQLException, JsonProcessingException { + try (PreparedStatement pst = conn.prepareStatement( + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName + + " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() <= ?")) { + pst.setTimestamp(1, ts1); + pst.setTimestamp(2, ts2); + + ResultSet rs = pst.executeQuery(); + Assert.assertTrue(rs.next()); + + String cdcVal = rs.getString(3); + Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + Map<String, Object> preImage = (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + Assert.assertNull(preImage.get("COL")); + Map<String, Object> postImage = + (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + String encodedBytes = (String) postImage.get("COL"); + byte[] bytes = Base64.getDecoder().decode(encodedBytes); + RawBsonDocument r1 = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(bsonDocument1, r1); + + Assert.assertTrue(rs.next()); + + cdcVal = rs.getString(3); + map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + preImage = (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + Assert.assertNull(preImage.get("COL")); + postImage = (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + encodedBytes = (String) postImage.get("COL"); + bytes = Base64.getDecoder().decode(encodedBytes); + RawBsonDocument r2 = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(bsonDocument2, r2); + + Assert.assertTrue(rs.next()); + + cdcVal = rs.getString(3); + map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + preImage = (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + Assert.assertNull(preImage.get("COL")); + postImage = (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + encodedBytes = (String) postImage.get("COL"); + bytes = Base64.getDecoder().decode(encodedBytes); + RawBsonDocument r3 = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(bsonDocument3, r3); + + Assert.assertFalse(rs.next()); + } + } + + private static void testCDCPostUpdate(Connection conn, String cdcName, Timestamp ts1, + Timestamp ts2, BsonDocument bsonDocument1, + BsonDocument bsonDocument2, + BsonDocument bsonDocument3) + throws SQLException, IOException { + ResultSet rs; + try (PreparedStatement pst = conn.prepareStatement( + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName + + " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() <= ?")) { + pst.setTimestamp(1, ts1); + pst.setTimestamp(2, ts2); + + rs = pst.executeQuery(); + Assert.assertTrue(rs.next()); + + String cdcVal = rs.getString(3); + Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + Map<String, Object> preImage = (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + String encodedBytes = (String) preImage.get("COL"); + byte[] bytes = Base64.getDecoder().decode(encodedBytes); + RawBsonDocument preDoc = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(bsonDocument1, preDoc); + + Map<String, Object> postImage = + (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + encodedBytes = (String) postImage.get("COL"); + bytes = Base64.getDecoder().decode(encodedBytes); + RawBsonDocument postDoc = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(RawBsonDocument.parse(getJsonString("json/sample_updated_01.json")), + postDoc); + + Assert.assertTrue(rs.next()); + + cdcVal = rs.getString(3); + map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + preImage = (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + encodedBytes = (String) preImage.get("COL"); + bytes = Base64.getDecoder().decode(encodedBytes); + preDoc = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(bsonDocument2, preDoc); + + postImage = (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + encodedBytes = (String) postImage.get("COL"); + bytes = Base64.getDecoder().decode(encodedBytes); + postDoc = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(RawBsonDocument.parse(getJsonString("json/sample_updated_02.json")), + postDoc); + + Assert.assertTrue(rs.next()); + + cdcVal = rs.getString(3); + map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + preImage = (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + encodedBytes = (String) preImage.get("COL"); + bytes = Base64.getDecoder().decode(encodedBytes); + preDoc = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(bsonDocument3, preDoc); + + postImage = (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + encodedBytes = (String) postImage.get("COL"); + bytes = Base64.getDecoder().decode(encodedBytes); + postDoc = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(RawBsonDocument.parse(getJsonString("json/sample_updated_03.json")), + postDoc); + + Assert.assertFalse(rs.next()); + } + } + + private static void testCDCUpdateOneRowChange(Connection conn, String cdcName, Timestamp ts1, + Timestamp ts2, BsonDocument bsonDocument1) + throws SQLException, IOException { + try (PreparedStatement pst = conn.prepareStatement( + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName + + " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() <= ?")) { + pst.setTimestamp(1, ts1); + pst.setTimestamp(2, ts2); + + ResultSet rs = pst.executeQuery(); + Assert.assertTrue(rs.next()); + + String cdcVal = rs.getString(3); + Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + Map<String, Object> preImage = (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + String encodedBytes = (String) preImage.get("COL"); + byte[] bytes = Base64.getDecoder().decode(encodedBytes); + RawBsonDocument preDoc = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(bsonDocument1, preDoc); + + Map<String, Object> postImage = + (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + encodedBytes = (String) postImage.get("COL"); + bytes = Base64.getDecoder().decode(encodedBytes); + RawBsonDocument postDoc = new RawBsonDocument(bytes, 0, bytes.length); + Assert.assertEquals(bsonDocument1, postDoc); + + Assert.assertFalse(rs.next()); + } + } + +} \ No newline at end of file diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/bson/ComparisonExpressionUtils2Test.java b/phoenix-core/src/test/java/org/apache/phoenix/util/bson/ComparisonExpressionUtils2Test.java new file mode 100644 index 0000000000..16311375cd --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/bson/ComparisonExpressionUtils2Test.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.util.bson; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.util.bson.SQLComparisonExpressionUtils; +import org.bson.BsonBinary; +import org.bson.BsonBoolean; +import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonInt32; +import org.bson.BsonString; +import org.bson.RawBsonDocument; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for BSON Condition Expression Utility. + */ +public class ComparisonExpressionUtils2Test { + + @Test + public void testSQLComparisonExpression1() { + RawBsonDocument rawBsonDocument = getDocumentValue(); + BsonDocument compareValues = getCompareValDocument(); + BsonDocument keyNames = getKeyNamesDocument(); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "(field_exists(#id) OR field_not_exists(#title))", rawBsonDocument, compareValues, + keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "((field_not_exists(#id) AND field_not_exists(#title1)) OR field_exists(#isbn2))" + + " OR ((#id <> :title) AND ((#4 = :inpublication) OR ((#isbn = :isbn)" + + " AND (#title = :title))))", rawBsonDocument, compareValues, keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "((field_exists(#nestedmap1.#isbn) AND field_not_exists(#nestedmap1.#nlist1[3])))", + rawBsonDocument, compareValues, keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedmap1.#id = :id AND (#nestedmap1.#4 = :inpublication)", rawBsonDocument, + compareValues, keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "((#nestedmap1.#id = :id) AND ((#nestedmap1.#4[0] = :inpublication) OR " + + "((#isbn[0] = :isbn) AND (#title = :title))) OR " + + "(#nestedmap1.#nlist1[0] = :nmap1_nlist1))", rawBsonDocument, compareValues, + keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "((field_not_exists(#id) AND field_not_exists(#title1)) OR field_exists(#isbn2))" + + " OR ((#nestedmap1.#id = :id) AND ((#nestedmap1.#4 = :inpublication)" + + " OR ((#isbn = :isbn) AND (#title = :title))))", rawBsonDocument, + compareValues, keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] <= :nestedlist1_485 AND #nestedlist1[1] > :nestedlist1_1 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + + "#nestedlist1[2][1].#id < :id1 AND #ids < :ids1 AND #id2 > :id2 AND #nestedmap1" + + ".#nlist1[2] > :nestedmap1_nlist1_3", rawBsonDocument, compareValues, + keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] <= :nestedlist1_485 AND #nestedlist1[1] >= :nestedlist1_1 AND " + + "#8 >= :nestedlist1_xyz0123 AND " + + "#nestedlist1[2][1].#id <= :id1 AND #ids <= :ids1 AND #id2 >= :id2 AND" + + " #nestedmap1.#nlist1[2] >= :nestedmap1_nlist1_3", rawBsonDocument, + compareValues, keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] >= :nestedlist1_4850 AND #nestedlist1[1] < :nestedlist1_10 AND" + + " #nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + "#nestedlist1[2][1].#id > :id10 AND #ids > :ids10 AND #id2 < :id20 AND " + + "#nestedmap1.#nlist1[2] < :nestedmap1_nlist1_30", rawBsonDocument, + compareValues, keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] >= :nestedlist1_4850 AND #nestedlist1[1] <= :nestedlist1_10 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + "#nestedlist1[2][1].#id >= :id10 AND #ids >= :ids10 AND #id2 <= :id20 AND " + + "#nestedmap1.#nlist1[2] <= :nestedmap1_nlist1_30 AND " + + "#nestedmap1.#nlist1[2] <> :nestedmap1_nlist1_30", rawBsonDocument, + compareValues, keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] >= :nestedlist1_4850 AND #nestedlist1[1] <= :nestedlist1_10 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + "#nestedlist1[2][1].#id >= :id10 AND #ids >= :ids10 AND #id2 <= :id20 AND " + + "#nestedmap1.#nlist1[2] <= :nestedmap1_nlist1_30 AND " + + "(#nestedmap1.#nlist1[2] = :nestedmap1_nlist1_30 OR #nestedlist1[0] BETWEEN " + + ":nestedlist1_4850 AND :id2)", rawBsonDocument, compareValues, keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] >= :nestedlist1_4850 AND #nestedlist1[1] <= :nestedlist1_10 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + "#nestedmap1.#nlist1[0] IN (:id, :id1, :id20, :nmap1_nlist1) AND " + + "#nestedmap1.#nlist1[2] <= :nestedmap1_nlist1_30 AND " + + "(#nestedmap1.#nlist1[2] = :nestedmap1_nlist1_30 OR #nestedlist1[0] BETWEEN " + + ":nestedlist1_4850 AND :id2)", rawBsonDocument, compareValues, keyNames)); + + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] >= :nestedlist1_4850 AND #nestedlist1[1] <= :nestedlist1_10 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + "#nestedmap1.#nlist1[0] IN (:id, :id1, :id20, :nmap1_nlist1) AND " + + "#nestedmap1.#nlist1[2] <= :nestedmap1_nlist1_30 AND " + + "(#nestedmap1.#nlist1[2] = :nestedmap1_nlist1_30 OR " + + " #nestedlist1[0] BETWEEN :nestedlist1_4850 AND :id2)" + + " AND NOT #nestedmap1.#4 IN (:id, :id1, :id20, :id21) AND #7 = :7 " + + "AND #10 >= :id2 AND #11 <= :id2 AND #12 = :id2 AND #13 = :id2 AND " + + "NOT #14 <> :id2", + rawBsonDocument, compareValues, keyNames)); + } + + /** + * Test that directly uses executable expression to reduce the dependency on pattern-matcher. + */ + @Test + public void testSQLComparisonExpression2() { + RawBsonDocument rawBsonDocument = getDocumentValue(); + BsonDocument compareValues = getCompareValDocument(); + BsonDocument keyNames = getKeyNamesDocument(); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "(field_not_exists(#id) OR field_not_exists(#title))", rawBsonDocument, compareValues, + keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "((field_not_exists(#id) AND field_not_exists(#title1)) OR field_exists(#isbn2))" + + " OR ((#id = :title) AND ((#4 = #4) OR ((#isbn = :isbn)" + + " AND (#title = :title))))", rawBsonDocument, compareValues, keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "((field_exists(#nestedmap1.#isbn) AND field_exists(#nestedmap1.#nlist1[3])))", + rawBsonDocument, compareValues, keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedmap1.#id = :id AND (#nestedmap1.#4 <> :inpublication)", rawBsonDocument, + compareValues, keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "((#nestedmap1.#id = :id) AND ((#nestedmap1.#4[0] = :inpublication) OR " + + "((#isbn[0] = :isbn) AND (#title = :title))) OR " + + "(#nestedmap1.#nlist1[0] <> :nmap1_nlist1))", rawBsonDocument, compareValues, + keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "((field_not_exists(#id) AND field_not_exists(#title1)) OR field_exists(#isbn2))" + + " OR ((#nestedmap1.#id = :id) AND ((#nestedmap1.#4 <> :inpublication)" + + " OR NOT ((#isbn = :isbn) AND (#title = :title))))", rawBsonDocument, + compareValues, keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] <= :nestedlist1_485 AND #nestedlist1[1] > :nestedlist1_1 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + + "#nestedlist1[2][1].#id < :id1 AND #ids < :ids1 AND #id2 > :id2 AND #nestedmap1" + + ".#nlist1[2] < :nestedmap1_nlist1_3", rawBsonDocument, compareValues, + keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] <= :nestedlist1_485 AND #nestedlist1[1] >= :nestedlist1_1 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + + "#nestedlist1[2][1].#id <= :id1 AND #ids <= :ids1 AND #id2 >= :id2 AND #nestedmap1" + + ".#nlist1[2] < :nestedmap1_nlist1_3", rawBsonDocument, compareValues, + keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] >= :nestedlist1_4850 AND #nestedlist1[1] < :nestedlist1_10 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + + "#nestedlist1[2][1].#id > :id10 AND #ids > :ids10 AND #id2 < :id20 AND #nestedmap1" + + ".#nlist1[2] >= :nestedmap1_nlist1_30", rawBsonDocument, compareValues, + keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] >= :nestedlist1_4850 AND #nestedlist1[1] <= :nestedlist1_10 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + "#nestedlist1[2][1].#id >= :id10 AND #ids >= :ids10 AND #id2 <= :id20 AND " + + "#nestedmap1.#nlist1[2] > :nestedmap1_nlist1_30 AND " + + "#nestedmap1.#nlist1[2] <> :nestedmap1_nlist1_30", rawBsonDocument, + compareValues, keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] >= :nestedlist1_4850 AND #nestedlist1[1] <= :nestedlist1_10 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + "#nestedlist1[2][1].#id >= :id10 AND #ids >= :ids10 AND #id2 <= :id20 AND " + + "#nestedmap1.#nlist1[2] <= :nestedmap1_nlist1_30 AND " + + + "(#nestedmap1.#nlist1[2] = :nestedmap1_nlist1_30 OR NOT #nestedlist1[0] BETWEEN " + + ":nestedlist1_4850 AND :id2)", rawBsonDocument, compareValues, keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] >= :nestedlist1_4850 AND #nestedlist1[1] <= :nestedlist1_10 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + "#nestedmap1.#nlist1[0] NOT IN (:id, :id1, :id20, :nmap1_nlist1) AND " + + "#nestedmap1.#nlist1[2] <= :nestedmap1_nlist1_30 AND " + + "(#nestedmap1.#nlist1[2] = :nestedmap1_nlist1_30 OR #nestedlist1[0] BETWEEN " + + ":nestedlist1_4850 AND :id2)", rawBsonDocument, compareValues, keyNames)); + + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "#nestedlist1[0] >= :nestedlist1_4850 AND #nestedlist1[1] <= :nestedlist1_10 AND " + + "#nestedlist1[2][0] >= :nestedlist1_xyz0123 AND " + + "#nestedmap1.#nlist1[0] IN (:id, :id1, :id20, :nmap1_nlist1) AND " + + "#nestedmap1.#nlist1[2] <= :nestedmap1_nlist1_30 AND " + + "(#nestedmap1.#nlist1[2] = :nestedmap1_nlist1_30 OR " + + " #nestedlist1[0] NOT BETWEEN :nestedlist1_4850 AND :id2)" + + " AND #nestedmap1.#4 IN (:id, :id1, :id20, :id21)", + rawBsonDocument, compareValues, keyNames)); + } + + private static BsonDocument getCompareValDocument() { + BsonDocument compareValues = new BsonDocument(); + compareValues.append(":id20", new BsonDouble(101.011)); + compareValues.append(":id2", new BsonInt32(12)); + compareValues.append(":nestedlist1_10", new BsonString("1234abce")); + compareValues.append(":id1", new BsonInt32(120)); + compareValues.append(":id10", new BsonInt32(101)); + compareValues.append(":gt", new BsonInt32(100)); + compareValues.append(":ids1", new BsonString("12")); + compareValues.append(":isbn", new BsonString("111-1111111111")); + compareValues.append(":nestedlist1_xyz0123", new BsonString("xyz0123")); + compareValues.append(":nestedlist1_485", new BsonDouble(-485.33)); + compareValues.append(":nestedmap1_nlist1_30", new BsonBinary(Bytes.toBytes("Whitee"))); + compareValues.append(":inpublication", new BsonBoolean(false)); + compareValues.append(":ids10", new BsonString("100")); + compareValues.append(":nestedmap1_nlist1_3", new BsonBinary(Bytes.toBytes("Whit"))); + compareValues.append(":nestedlist1_1", new BsonString("1234abcc")); + compareValues.append(":nmap1_nlist1", new BsonString("NListVal01")); + compareValues.append(":nestedlist1_4850", new BsonDouble(-485.35)); + compareValues.append(":id", new BsonDouble(101.01)); + compareValues.append(":title", new BsonString("Book 101 Title")); + compareValues.append(":zero", new BsonInt32(0)); + compareValues.append(":id21", new BsonInt32(121)); + compareValues.append(":7", new BsonString("Name_")); + return compareValues; + } + + private static BsonDocument getKeyNamesDocument() { + BsonDocument keyNames = new BsonDocument(); + keyNames.append("#id", new BsonString("Id")); + keyNames.append("#title", new BsonString("Title")); + keyNames.append("#title1", new BsonString("Title1")); + keyNames.append("#isbn", new BsonString("ISBN")); + keyNames.append("#isbn2", new BsonString("ISBN2")); + keyNames.append("#4", new BsonString("InPublication")); + keyNames.append("#nestedmap1", new BsonString("NestedMap1")); + keyNames.append("#nestedlist1", new BsonString("NestedList1")); + keyNames.append("#nlist1", new BsonString("NList1")); + keyNames.append("#ids", new BsonString("IdS")); + keyNames.append("#id2", new BsonString("Id2")); + keyNames.append("#7", new BsonString("Id.Name")); + keyNames.append("#8", new BsonString("NestedList1[2][0]")); + keyNames.append("#10", new BsonString(">")); + keyNames.append("#11", new BsonString("[")); + keyNames.append("#12", new BsonString("#")); + keyNames.append("#13", new BsonString("~")); + keyNames.append("#14", new BsonString("^")); + return keyNames; + } + + private static RawBsonDocument getDocumentValue() { + String json = "{\n" + + " \"InPublication\" : false,\n" + + " \"ISBN\" : \"111-1111111111\",\n" + + " \"NestedList1\" : [ -485.34, \"1234abcd\", [ \"xyz0123\", {\n" + + " \"InPublication\" : false,\n" + + " \"ISBN\" : \"111-1111111111\",\n" + + " \"Title\" : \"Book 101 Title\",\n" + + " \"Id\" : 101.01\n" + + " } ] ],\n" + + " \"NestedMap1\" : {\n" + + " \"InPublication\" : false,\n" + + " \"ISBN\" : \"111-1111111111\",\n" + + " \"Title\" : \"Book 101 Title\",\n" + + " \"Id\" : 101.01,\n" + + " \"NList1\" : [ \"NListVal01\", -23.4, {\n" + + " \"$binary\" : {\n" + + " \"base64\" : \"V2hpdGU=\",\n" + + " \"subType\" : \"00\"\n" + + " }\n" + + " } ]\n" + + " },\n" + + " \"Id2\" : 101.01,\n" + + " \"Id.Name\" : \"Name_\",\n" + + " \"IdS\" : \"101.01\",\n" + + " \">\" : 12,\n " + + " \"[\" : 12,\n " + + " \"#\" : 12,\n " + + " \"~\" : 12,\n " + + " \"^\" : 12,\n " + + " \"Title\" : \"Book 101 Title\",\n" + + " \"Id\" : 101.01\n" + + "}"; + return RawBsonDocument.parse(json); + } + +} \ No newline at end of file diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/bson/ComparisonExpressionUtilsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/bson/ComparisonExpressionUtilsTest.java index 223ae0db93..b06d815000 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/bson/ComparisonExpressionUtilsTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/bson/ComparisonExpressionUtilsTest.java @@ -66,6 +66,11 @@ public class ComparisonExpressionUtilsTest { "NestedMap1.Id = $Id AND (NestedMap1.InPublication = InPublication)", rawBsonDocument, compareValues)); + assertFalse(SQLComparisonExpressionUtils.evaluateConditionExpression( + "((attribute_exists(0) AND attribute_exists(1)) AND attribute_not_exists(#3)) AND ((" + + "(attribute_not_exists(0) AND attribute_not_exists(1)) OR attribute_exists(#0)" + + ") OR (#_ = :0))", rawBsonDocument, compareValues)); + assertTrue(SQLComparisonExpressionUtils.evaluateConditionExpression( "((NestedMap1.Id = $Id) AND ((NestedMap1.InPublication[0] = InPublication) OR " + "((ISBN[0] = :ISBN) AND (Title = #Title))) OR " @@ -1570,6 +1575,7 @@ public class ComparisonExpressionUtilsTest { " \"#NestedList1_10\" : \"1234abce\",\n" + " \"$Id1\" : 120,\n" + " \"$Id10\" : 101,\n" + + " \":>\" : 100,\n " + " \"$Ids1\" : \"12\",\n" + " \":ISBN\" : \"111-1111111111\",\n" + " \"#NestedList1_xyz0123\" : \"xyz0123\",\n" + @@ -1651,6 +1657,7 @@ public class ComparisonExpressionUtilsTest { " },\n" + " \"Id2\" : 101.01,\n" + " \"IdS\" : \"101.01\",\n" + + " \">\" : 12,\n " + " \"Title\" : \"Book 101 Title\",\n" + " \"Id\" : 101.01\n" + "}";