This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new f881db285b PHOENIX-7343: Support ARRAY and JSON as simple values in 
CDC (#1965)
f881db285b is described below

commit f881db285bc5b464719d9927a4418a8a154007f2
Author: Hari Krishna Dara <[email protected]>
AuthorDate: Fri Sep 20 03:31:21 2024 +0530

    PHOENIX-7343: Support ARRAY and JSON as simple values in CDC (#1965)
---
 .../main/java/org/apache/phoenix/util/CDCUtil.java | 28 +++++++++++++---------
 .../coprocessor/CDCGlobalIndexRegionScanner.java   |  7 +-----
 .../java/org/apache/phoenix/end2end/CDCBaseIT.java | 25 ++++++++++++++++++-
 .../org/apache/phoenix/end2end/CDCQueryIT.java     |  2 ++
 4 files changed, 44 insertions(+), 18 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 38a1173ace..96641c2dcf 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PDataType;
 import org.bson.RawBsonDocument;
@@ -122,26 +121,33 @@ public class CDCUtil {
 
     public static Object getColumnEncodedValue(Object value, PDataType 
dataType) {
         if (value != null) {
-            // TODO: Need suport for DECIMAL, NUMERIC and array types.
             if (dataType.getSqlType() == PDataType.BSON_TYPE) {
                 value = Bytes.toBytes(((RawBsonDocument) 
value).getByteBuffer().asNIO());
-            } else if (dataType.getSqlType() == Types.BINARY ||
-                    dataType.getSqlType() == Types.VARBINARY
-                    || dataType.getSqlType() == Types.LONGVARBINARY
-                    || dataType.getSqlType() == 
PDataType.VARBINARY_ENCODED_TYPE) {
+            } else if (isBinaryType(dataType)) {
                 // Unfortunately, Base64.Encoder has no option to specify 
offset and length so can't
                 // avoid copying bytes.
                 value = Base64.getEncoder().encodeToString((byte[]) value);
             } else {
-                if (dataType.getSqlType() == Types.DATE
-                        || dataType.getSqlType() == Types.TIMESTAMP
-                        || dataType.getSqlType() == Types.TIME
-                        || dataType.getSqlType() == Types.TIME_WITH_TIMEZONE
-                        || dataType.getSqlType() == 
Types.TIMESTAMP_WITH_TIMEZONE) {
+                int sqlType = dataType.getSqlType();
+                if (sqlType == Types.DATE
+                        || sqlType == Types.TIMESTAMP
+                        || sqlType == Types.TIME
+                        || sqlType == Types.TIME_WITH_TIMEZONE
+                        || dataType.isArrayType()
+                        || sqlType == PDataType.JSON_TYPE
+                        || sqlType == Types.TIMESTAMP_WITH_TIMEZONE) {
                     value = value.toString();
                 }
             }
         }
         return value;
     }
+
+    public static boolean isBinaryType(PDataType dataType) {
+        int sqlType = dataType.getSqlType();
+        return (sqlType == Types.BINARY
+                || sqlType == Types.VARBINARY
+                || sqlType == Types.LONGVARBINARY
+                || dataType.getSqlType() == PDataType.VARBINARY_ENCODED_TYPE);
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 42375beeee..f0c20f1ccf 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -18,8 +18,6 @@
 package org.apache.phoenix.coprocessor;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilder;
 import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -48,10 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.sql.Types;
 import java.util.Arrays;
-import java.util.Base64;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -257,7 +252,7 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
 
     private Object getColumnValue(byte[] cellValue, int offset, int length, 
PDataType dataType) {
         Object value;
-        if (dataType.getSqlType() == Types.BINARY) {
+        if (CDCUtil.isBinaryType(dataType)) {
             value = ImmutableBytesPtr.copyBytesIfNecessary(cellValue, offset, 
length);
         } else {
             value = dataType.toObject(cellValue, offset, length);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index c554da2079..c1ef1c399f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -17,12 +17,16 @@
  */
 package org.apache.phoenix.end2end;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.Version;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -32,6 +36,7 @@ import org.apache.phoenix.schema.types.PBinaryBase;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.schema.types.PhoenixArray;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ManualEnvironmentEdge;
@@ -40,6 +45,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -78,6 +84,10 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCBaseIT.class);
     protected static final ObjectMapper mapper = new ObjectMapper();
     static {
+        SimpleModule module = new SimpleModule("ChangeRow", new Version(1, 0, 
0, null, null, null));
+        PhoenixArraySerializer phoenixArraySerializer = new 
PhoenixArraySerializer(PhoenixArray.class);
+        module.addSerializer(PhoenixArray.class, phoenixArraySerializer);
+        mapper.registerModule(module);
         mapper.configure(DeserializationFeature.USE_LONG_FOR_INTS, true);
     }
 
@@ -825,4 +835,17 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
             IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
         }
     };
+
+    public static class PhoenixArraySerializer extends 
StdSerializer<PhoenixArray> {
+        protected PhoenixArraySerializer(Class<PhoenixArray> t) {
+            super(t);
+        }
+
+        @Override
+        public void serialize(PhoenixArray value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
+            gen.writeStartObject();
+            gen.writeStringField("elements", value.toString());
+            gen.writeEndObject();
+        }
+    }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index e36851a742..8f675d9d90 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -257,6 +257,8 @@ public class CDCQueryIT extends CDCBaseIT {
             put("V7", "TIMESTAMP");
             put("V8", "VARBINARY");
             put("V9", "BINARY");
+            put("V10", "VARCHAR ARRAY");
+            put("V11", "JSON");
         }};
         try (Connection conn = newConnection()) {
             createTable(conn, tableName, pkColumns, dataColumns, multitenant, 
encodingScheme,

Reply via email to