This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new f881db285b PHOENIX-7343: Support ARRAY and JSON as simple values in
CDC (#1965)
f881db285b is described below
commit f881db285bc5b464719d9927a4418a8a154007f2
Author: Hari Krishna Dara <[email protected]>
AuthorDate: Fri Sep 20 03:31:21 2024 +0530
PHOENIX-7343: Support ARRAY and JSON as simple values in CDC (#1965)
---
.../main/java/org/apache/phoenix/util/CDCUtil.java | 28 +++++++++++++---------
.../coprocessor/CDCGlobalIndexRegionScanner.java | 7 +-----
.../java/org/apache/phoenix/end2end/CDCBaseIT.java | 25 ++++++++++++++++++-
.../org/apache/phoenix/end2end/CDCQueryIT.java | 2 ++
4 files changed, 44 insertions(+), 18 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 38a1173ace..96641c2dcf 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PDataType;
import org.bson.RawBsonDocument;
@@ -122,26 +121,33 @@ public class CDCUtil {
public static Object getColumnEncodedValue(Object value, PDataType
dataType) {
if (value != null) {
- // TODO: Need suport for DECIMAL, NUMERIC and array types.
if (dataType.getSqlType() == PDataType.BSON_TYPE) {
value = Bytes.toBytes(((RawBsonDocument)
value).getByteBuffer().asNIO());
- } else if (dataType.getSqlType() == Types.BINARY ||
- dataType.getSqlType() == Types.VARBINARY
- || dataType.getSqlType() == Types.LONGVARBINARY
- || dataType.getSqlType() ==
PDataType.VARBINARY_ENCODED_TYPE) {
+ } else if (isBinaryType(dataType)) {
// Unfortunately, Base64.Encoder has no option to specify
offset and length so can't
// avoid copying bytes.
value = Base64.getEncoder().encodeToString((byte[]) value);
} else {
- if (dataType.getSqlType() == Types.DATE
- || dataType.getSqlType() == Types.TIMESTAMP
- || dataType.getSqlType() == Types.TIME
- || dataType.getSqlType() == Types.TIME_WITH_TIMEZONE
- || dataType.getSqlType() ==
Types.TIMESTAMP_WITH_TIMEZONE) {
+ int sqlType = dataType.getSqlType();
+ if (sqlType == Types.DATE
+ || sqlType == Types.TIMESTAMP
+ || sqlType == Types.TIME
+ || sqlType == Types.TIME_WITH_TIMEZONE
+ || dataType.isArrayType()
+ || sqlType == PDataType.JSON_TYPE
+ || sqlType == Types.TIMESTAMP_WITH_TIMEZONE) {
value = value.toString();
}
}
}
return value;
}
+
+ public static boolean isBinaryType(PDataType dataType) {
+ int sqlType = dataType.getSqlType();
+ return (sqlType == Types.BINARY
+ || sqlType == Types.VARBINARY
+ || sqlType == Types.LONGVARBINARY
+ || dataType.getSqlType() == PDataType.VARBINARY_ENCODED_TYPE);
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 42375beeee..f0c20f1ccf 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -18,8 +18,6 @@
package org.apache.phoenix.coprocessor;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -48,10 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.sql.Types;
import java.util.Arrays;
-import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -257,7 +252,7 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
private Object getColumnValue(byte[] cellValue, int offset, int length,
PDataType dataType) {
Object value;
- if (dataType.getSqlType() == Types.BINARY) {
+ if (CDCUtil.isBinaryType(dataType)) {
value = ImmutableBytesPtr.copyBytesIfNecessary(cellValue, offset,
length);
} else {
value = dataType.toObject(cellValue, offset, length);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index c554da2079..c1ef1c399f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -17,12 +17,16 @@
*/
package org.apache.phoenix.end2end;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -32,6 +36,7 @@ import org.apache.phoenix.schema.types.PBinaryBase;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.schema.types.PhoenixArray;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
@@ -40,6 +45,7 @@ import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -78,6 +84,10 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(CDCBaseIT.class);
protected static final ObjectMapper mapper = new ObjectMapper();
static {
+ SimpleModule module = new SimpleModule("ChangeRow", new Version(1, 0,
0, null, null, null));
+ PhoenixArraySerializer phoenixArraySerializer = new
PhoenixArraySerializer(PhoenixArray.class);
+ module.addSerializer(PhoenixArray.class, phoenixArraySerializer);
+ mapper.registerModule(module);
mapper.configure(DeserializationFeature.USE_LONG_FOR_INTS, true);
}
@@ -825,4 +835,17 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
}
};
+
+ public static class PhoenixArraySerializer extends
StdSerializer<PhoenixArray> {
+ protected PhoenixArraySerializer(Class<PhoenixArray> t) {
+ super(t);
+ }
+
+ @Override
+ public void serialize(PhoenixArray value, JsonGenerator gen,
SerializerProvider provider) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("elements", value.toString());
+ gen.writeEndObject();
+ }
+ }
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index e36851a742..8f675d9d90 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -257,6 +257,8 @@ public class CDCQueryIT extends CDCBaseIT {
put("V7", "TIMESTAMP");
put("V8", "VARBINARY");
put("V9", "BINARY");
+ put("V10", "VARCHAR ARRAY");
+ put("V11", "JSON");
}};
try (Connection conn = newConnection()) {
createTable(conn, tableName, pkColumns, dataColumns, multitenant,
encodingScheme,