This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry-pxf.git

commit 9133f90bd46d92a8fff941cf754eedeceeae4307
Author: Ashuka Xue <[email protected]>
AuthorDate: Wed Sep 7 12:40:15 2022 -0700

    Add JsonProtocolHandler to use HdfsFileFragmenter for multi-line JSON (#858)
    
    This commit fixes an issue that could occur with multi-line JSON files.
    
    Previously, it was possible for a fragment to improperly parse a JSON 
object if the split started in the middle of a string, causing wrong results. 
This commit now uses the HdfsFileFragmenter for multi-line JSON files.
---
 .../pxf/plugins/hdfs/HdfsFileFragmenter.java       |  2 +-
 .../pxf/plugins/json/JsonProtocolHandler.java      | 41 ++++++++++++++++
 .../pxf/plugins/json/JsonProtocolHandlerTest.java  | 57 ++++++++++++++++++++++
 .../pxf/plugins/s3/S3ProtocolHandler.java          |  7 ++-
 .../pxf/plugins/s3/S3ProtocolHandlerTest.java      | 34 +++++++++++++
 .../src/main/resources/pxf-profiles-default.xml    |  6 +++
 6 files changed, 145 insertions(+), 2 deletions(-)

diff --git 
a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsFileFragmenter.java
 
b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsFileFragmenter.java
index 26164449..efcb746c 100644
--- 
a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsFileFragmenter.java
+++ 
b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsFileFragmenter.java
@@ -47,7 +47,7 @@ public class HdfsFileFragmenter extends HdfsDataFragmenter {
         }
 
         fragments = Arrays.stream(fileStatusArray)
-                .map(fileStatus -> new 
Fragment(fileStatus.getPath().toUri().toString()))
+                .map(fileStatus -> new 
Fragment(fileStatus.getPath().toUri().toString(), new HcfsFragmentMetadata(0, 
fileStatus.getLen())))
                 .collect(Collectors.toList());
         LOG.debug("Total number of fragments = {}", fragments.size());
 
diff --git 
a/server/pxf-json/src/main/java/org/greenplum/pxf/plugins/json/JsonProtocolHandler.java
 
b/server/pxf-json/src/main/java/org/greenplum/pxf/plugins/json/JsonProtocolHandler.java
new file mode 100644
index 00000000..9673212e
--- /dev/null
+++ 
b/server/pxf-json/src/main/java/org/greenplum/pxf/plugins/json/JsonProtocolHandler.java
@@ -0,0 +1,41 @@
+package org.greenplum.pxf.plugins.json;
+
+import org.greenplum.pxf.api.model.ProtocolHandler;
+import org.greenplum.pxf.api.model.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
+/**
+ * Implementation of ProtocolHandler for json protocol.
+ */
+public class JsonProtocolHandler implements ProtocolHandler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JsonProtocolHandler.class);
+    private static final String HCFS_FILE_FRAGMENTER = 
"org.greenplum.pxf.plugins.hdfs.HdfsFileFragmenter";
+
+    @Override
+    public String getFragmenterClassName(RequestContext context) {
+        String fragmenter = context.getFragmenter(); // default to fragmenter 
defined by the profile
+        if (useMultilineJson(context)) {
+            fragmenter = HCFS_FILE_FRAGMENTER;
+        }
+        LOG.debug("Determined to use {} fragmenter", fragmenter);
+        return fragmenter;
+    }
+
+    @Override
+    public String getAccessorClassName(RequestContext context) {
+        return context.getAccessor();
+    }
+
+    @Override
+    public String getResolverClassName(RequestContext context) {
+        return context.getResolver();
+    }
+
+    public boolean useMultilineJson(RequestContext context) {
+        return isNotEmpty(context.getOption("identifier"));
+    }
+}
diff --git 
a/server/pxf-json/src/test/java/org/greenplum/pxf/plugins/json/JsonProtocolHandlerTest.java
 
b/server/pxf-json/src/test/java/org/greenplum/pxf/plugins/json/JsonProtocolHandlerTest.java
new file mode 100644
index 00000000..b90ab9c4
--- /dev/null
+++ 
b/server/pxf-json/src/test/java/org/greenplum/pxf/plugins/json/JsonProtocolHandlerTest.java
@@ -0,0 +1,57 @@
+package org.greenplum.pxf.plugins.json;
+
+import org.greenplum.pxf.api.model.RequestContext;
+import org.greenplum.pxf.api.utilities.ColumnDescriptor;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class JsonProtocolHandlerTest {
+
+    private static final String FILE_FRAGMENTER = 
"org.greenplum.pxf.plugins.hdfs.HdfsFileFragmenter";
+    private static final String DEFAULT_ACCESSOR = "default-accessor";
+    private static final String DEFAULT_RESOLVER = "default-resolver";
+    private static final String DEFAULT_FRAGMENTER = "default-fragmenter";
+    private JsonProtocolHandler handler;
+    private RequestContext context;
+
+    @BeforeEach
+    public void before() {
+        handler = new JsonProtocolHandler();
+        context = new RequestContext();
+        context.setFragmenter("default-fragmenter");
+        context.setAccessor("default-accessor");
+        context.setResolver("default-resolver");
+        List<ColumnDescriptor> columns = new ArrayList<>();
+        columns.add(new ColumnDescriptor("c1", 1, 0, "INT", null, true)); // 
actual args do not matter
+        columns.add(new ColumnDescriptor("c2", 2, 0, "INT", null, true)); // 
actual args do not matter
+        context.setTupleDescription(columns);
+    }
+
+    @Test
+    public void testTextIdentifierOptionMissing() {
+        assertEquals(DEFAULT_FRAGMENTER, 
handler.getFragmenterClassName(context));
+        assertEquals(DEFAULT_ACCESSOR, handler.getAccessorClassName(context));
+        assertEquals(DEFAULT_RESOLVER, handler.getResolverClassName(context));
+    }
+
+    @Test
+    public void testWithEmptyIdentifier() {
+        context.addOption("IDENTIFIER", "");
+        assertEquals(DEFAULT_FRAGMENTER, 
handler.getFragmenterClassName(context));
+        assertEquals(DEFAULT_ACCESSOR, handler.getAccessorClassName(context));
+        assertEquals(DEFAULT_RESOLVER, handler.getResolverClassName(context));
+    }
+
+    @Test
+    public void testWithIdentifier() {
+        context.addOption("IDENTIFIER", "c1");
+        assertEquals(FILE_FRAGMENTER, handler.getFragmenterClassName(context));
+        assertEquals(DEFAULT_ACCESSOR, handler.getAccessorClassName(context));
+        assertEquals(DEFAULT_RESOLVER, handler.getResolverClassName(context));
+    }
+}
diff --git 
a/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java
 
b/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java
index bd2e5ac4..145dea64 100644
--- 
a/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java
+++ 
b/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java
@@ -14,6 +14,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static 
org.greenplum.pxf.plugins.s3.S3SelectAccessor.FILE_HEADER_INFO_IGNORE;
 import static 
org.greenplum.pxf.plugins.s3.S3SelectAccessor.FILE_HEADER_INFO_USE;
 
@@ -54,7 +55,7 @@ public class S3ProtocolHandler implements ProtocolHandler {
     @Override
     public String getFragmenterClassName(RequestContext context) {
         String fragmenter = context.getFragmenter(); // default to fragmenter 
defined by the profile
-        if (useS3Select(context)) {
+        if (useS3Select(context) || useMultilineJson(context)) {
             fragmenter = HCFS_FILE_FRAGMENTER;
         }
         LOG.debug("Determined to use {} fragmenter", fragmenter);
@@ -127,6 +128,10 @@ public class S3ProtocolHandler implements ProtocolHandler {
         }
     }
 
+    public boolean useMultilineJson(RequestContext context) {
+        return isNotEmpty(context.getOption("identifier"));
+    }
+
     /**
      * For CSV or TEXT files, it returns true if the file has headers
      *
diff --git 
a/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java
 
b/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java
index 3092a26d..4456afa3 100644
--- 
a/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java
+++ 
b/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java
@@ -60,6 +60,8 @@ public class S3ProtocolHandlerTest {
     private static final String[] 
EXPECTED_RESOLVER_TEXT_AUTO_NO_BENEFIT_HAS_HEADER = {DEFAULT_RESOLVER, 
STRING_PASS_RESOLVER, STRING_PASS_RESOLVER, STRING_PASS_RESOLVER, 
DEFAULT_RESOLVER};
     private static final String[] 
EXPECTED_FRAGMENTER_TEXT_AUTO_NO_BENEFIT_HAS_HEADER = {DEFAULT_FRAGMENTER, 
FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, DEFAULT_FRAGMENTER};
 
+    private static final String[] EXPECTED_FRAGMENTER_MULTILINE = 
{FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, 
FILE_FRAGMENTER};
+
     private S3ProtocolHandler handler;
     private RequestContext context;
 
@@ -415,6 +417,38 @@ public class S3ProtocolHandlerTest {
         verifyResolvers(context, EXPECTED_RESOLVER_GPDB_WRITABLE_OFF);
     }
 
+    @Test
+    public void testTextIdentifierAndSelectOff() {
+        context.addOption("S3_SELECT", "off");
+        context.addOption("IDENTIFIER", "c1");
+        context.setOutputFormat(OutputFormat.TEXT);
+        verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_OFF);
+        verifyResolvers(context, EXPECTED_RESOLVER_TEXT_OFF);
+        verifyFragmenters(context, EXPECTED_FRAGMENTER_MULTILINE);
+    }
+
+    @Test
+    public void testTextIdentifierAndSelectOn() {
+        // s3 options should override multiline json fragmenter option
+        context.addOption("S3_SELECT", "on");
+        context.addOption("IDENTIFIER", "c1");
+        context.setOutputFormat(OutputFormat.TEXT);
+        verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_ON);
+        verifyResolvers(context, EXPECTED_RESOLVER_TEXT_ON);
+        verifyFragmenters(context, EXPECTED_FRAGMENTER_TEXT_ON);
+    }
+
+    @Test
+    public void testTextIdentifierAndSelectAuto() {
+        // s3 options should override multiline json fragmenter option
+        context.addOption("S3_SELECT", "auto");
+        context.addOption("IDENTIFIER", "c1");
+        context.setOutputFormat(OutputFormat.TEXT);
+        verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_AUTO_NO_BENEFIT);
+        verifyResolvers(context, EXPECTED_RESOLVER_TEXT_AUTO_NO_BENEFIT);
+        verifyFragmenters(context, EXPECTED_FRAGMENTER_MULTILINE);
+    }
+
     private void verifyFragmenters(RequestContext context, String[] expected) {
         IntStream.range(0, FORMATS.length).forEach(i -> {
             context.setFormat(FORMATS[i]);
diff --git a/server/pxf-service/src/main/resources/pxf-profiles-default.xml 
b/server/pxf-service/src/main/resources/pxf-profiles-default.xml
index 2ebadf82..77255f33 100644
--- a/server/pxf-service/src/main/resources/pxf-profiles-default.xml
+++ b/server/pxf-service/src/main/resources/pxf-profiles-default.xml
@@ -709,6 +709,7 @@ under the License.
             <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor>
             <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver>
         </plugins>
+        <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler>
     </profile>
     <profile>
         <name>hdfs:json</name>
@@ -723,6 +724,7 @@ under the License.
             <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor>
             <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver>
         </plugins>
+        <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler>
     </profile>
     <profile>
         <name>s3:json</name>
@@ -757,6 +759,7 @@ under the License.
             <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor>
             <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver>
         </plugins>
+        <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler>
         <protocol>adl</protocol>
     </profile>
     <profile>
@@ -772,6 +775,7 @@ under the License.
             <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor>
             <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver>
         </plugins>
+        <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler>
         <protocol>wasbs</protocol>
     </profile>
     <profile>
@@ -787,6 +791,7 @@ under the License.
             <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor>
             <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver>
         </plugins>
+        <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler>
         <protocol>gs</protocol>
     </profile>
     <profile>
@@ -802,6 +807,7 @@ under the License.
             <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor>
             <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver>
         </plugins>
+        <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler>
     </profile>
 
     <!-- ==================== SEQUENCE FILE PROFILES ==================== -->


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to