This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry-pxf.git
commit 9133f90bd46d92a8fff941cf754eedeceeae4307 Author: Ashuka Xue <[email protected]> AuthorDate: Wed Sep 7 12:40:15 2022 -0700 Add JsonProtocolHandler to use HdfsFileFragmenter for multi-line JSON (#858) This commit fixes an issue that could occur with multi-line JSON files. Previously, it was possible for a fragment to improperly parse a JSON object if the split started in the middle of a string, causing wrong results. This commit now uses the HdfsFileFragmenter for multi-line JSON files. --- .../pxf/plugins/hdfs/HdfsFileFragmenter.java | 2 +- .../pxf/plugins/json/JsonProtocolHandler.java | 41 ++++++++++++++++ .../pxf/plugins/json/JsonProtocolHandlerTest.java | 57 ++++++++++++++++++++++ .../pxf/plugins/s3/S3ProtocolHandler.java | 7 ++- .../pxf/plugins/s3/S3ProtocolHandlerTest.java | 34 +++++++++++++ .../src/main/resources/pxf-profiles-default.xml | 6 +++ 6 files changed, 145 insertions(+), 2 deletions(-) diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsFileFragmenter.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsFileFragmenter.java index 26164449..efcb746c 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsFileFragmenter.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsFileFragmenter.java @@ -47,7 +47,7 @@ public class HdfsFileFragmenter extends HdfsDataFragmenter { } fragments = Arrays.stream(fileStatusArray) - .map(fileStatus -> new Fragment(fileStatus.getPath().toUri().toString())) + .map(fileStatus -> new Fragment(fileStatus.getPath().toUri().toString(), new HcfsFragmentMetadata(0, fileStatus.getLen()))) .collect(Collectors.toList()); LOG.debug("Total number of fragments = {}", fragments.size()); diff --git a/server/pxf-json/src/main/java/org/greenplum/pxf/plugins/json/JsonProtocolHandler.java b/server/pxf-json/src/main/java/org/greenplum/pxf/plugins/json/JsonProtocolHandler.java new file mode 100644 index 00000000..9673212e --- /dev/null +++ b/server/pxf-json/src/main/java/org/greenplum/pxf/plugins/json/JsonProtocolHandler.java @@ -0,0 +1,41 @@ +package org.greenplum.pxf.plugins.json; + +import org.greenplum.pxf.api.model.ProtocolHandler; +import org.greenplum.pxf.api.model.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.commons.lang.StringUtils.isNotEmpty; + +/** + * Implementation of ProtocolHandler for json protocol. + */ +public class JsonProtocolHandler implements ProtocolHandler { + + private static final Logger LOG = LoggerFactory.getLogger(JsonProtocolHandler.class); + private static final String HCFS_FILE_FRAGMENTER = "org.greenplum.pxf.plugins.hdfs.HdfsFileFragmenter"; + + @Override + public String getFragmenterClassName(RequestContext context) { + String fragmenter = context.getFragmenter(); // default to fragmenter defined by the profile + if (useMultilineJson(context)) { + fragmenter = HCFS_FILE_FRAGMENTER; + } + LOG.debug("Determined to use {} fragmenter", fragmenter); + return fragmenter; + } + + @Override + public String getAccessorClassName(RequestContext context) { + return context.getAccessor(); + } + + @Override + public String getResolverClassName(RequestContext context) { + return context.getResolver(); + } + + public boolean useMultilineJson(RequestContext context) { + return isNotEmpty(context.getOption("identifier")); + } +} diff --git a/server/pxf-json/src/test/java/org/greenplum/pxf/plugins/json/JsonProtocolHandlerTest.java b/server/pxf-json/src/test/java/org/greenplum/pxf/plugins/json/JsonProtocolHandlerTest.java new file mode 100644 index 00000000..b90ab9c4 --- /dev/null +++ b/server/pxf-json/src/test/java/org/greenplum/pxf/plugins/json/JsonProtocolHandlerTest.java @@ -0,0 +1,57 @@ +package org.greenplum.pxf.plugins.json; + +import org.greenplum.pxf.api.model.RequestContext; +import org.greenplum.pxf.api.utilities.ColumnDescriptor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class JsonProtocolHandlerTest { + + private static final String FILE_FRAGMENTER = "org.greenplum.pxf.plugins.hdfs.HdfsFileFragmenter"; + private static final String DEFAULT_ACCESSOR = "default-accessor"; + private static final String DEFAULT_RESOLVER = "default-resolver"; + private static final String DEFAULT_FRAGMENTER = "default-fragmenter"; + private JsonProtocolHandler handler; + private RequestContext context; + + @BeforeEach + public void before() { + handler = new JsonProtocolHandler(); + context = new RequestContext(); + context.setFragmenter("default-fragmenter"); + context.setAccessor("default-accessor"); + context.setResolver("default-resolver"); + List<ColumnDescriptor> columns = new ArrayList<>(); + columns.add(new ColumnDescriptor("c1", 1, 0, "INT", null, true)); // actual args do not matter + columns.add(new ColumnDescriptor("c2", 2, 0, "INT", null, true)); // actual args do not matter + context.setTupleDescription(columns); + } + + @Test + public void testTextIdentifierOptionMissing() { + assertEquals(DEFAULT_FRAGMENTER, handler.getFragmenterClassName(context)); + assertEquals(DEFAULT_ACCESSOR, handler.getAccessorClassName(context)); + assertEquals(DEFAULT_RESOLVER, handler.getResolverClassName(context)); + } + + @Test + public void testWithEmptyIdentifier() { + context.addOption("IDENTIFIER", ""); + assertEquals(DEFAULT_FRAGMENTER, handler.getFragmenterClassName(context)); + assertEquals(DEFAULT_ACCESSOR, handler.getAccessorClassName(context)); + assertEquals(DEFAULT_RESOLVER, handler.getResolverClassName(context)); + } + + @Test + public void testWithIdentifier() { + context.addOption("IDENTIFIER", "c1"); + assertEquals(FILE_FRAGMENTER, handler.getFragmenterClassName(context)); + assertEquals(DEFAULT_ACCESSOR, handler.getAccessorClassName(context)); + assertEquals(DEFAULT_RESOLVER, handler.getResolverClassName(context)); + } +} diff --git a/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java b/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java index bd2e5ac4..145dea64 100644 --- a/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java +++ b/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java @@ -14,6 +14,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import static org.apache.commons.lang.StringUtils.isNotEmpty; import static org.greenplum.pxf.plugins.s3.S3SelectAccessor.FILE_HEADER_INFO_IGNORE; import static org.greenplum.pxf.plugins.s3.S3SelectAccessor.FILE_HEADER_INFO_USE; @@ -54,7 +55,7 @@ public class S3ProtocolHandler implements ProtocolHandler { @Override public String getFragmenterClassName(RequestContext context) { String fragmenter = context.getFragmenter(); // default to fragmenter defined by the profile - if (useS3Select(context)) { + if (useS3Select(context) || useMultilineJson(context)) { fragmenter = HCFS_FILE_FRAGMENTER; } LOG.debug("Determined to use {} fragmenter", fragmenter); @@ -127,6 +128,10 @@ public class S3ProtocolHandler implements ProtocolHandler { } } + public boolean useMultilineJson(RequestContext context) { + return isNotEmpty(context.getOption("identifier")); + } + /** * For CSV or TEXT files, it returns true if the file has headers * diff --git a/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java b/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java index 3092a26d..4456afa3 100644 --- a/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java +++ b/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java @@ -60,6 +60,8 @@ public class S3ProtocolHandlerTest { private static final String[] EXPECTED_RESOLVER_TEXT_AUTO_NO_BENEFIT_HAS_HEADER = {DEFAULT_RESOLVER, STRING_PASS_RESOLVER, STRING_PASS_RESOLVER, STRING_PASS_RESOLVER, DEFAULT_RESOLVER}; private static final String[] EXPECTED_FRAGMENTER_TEXT_AUTO_NO_BENEFIT_HAS_HEADER = {DEFAULT_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, DEFAULT_FRAGMENTER}; + private static final String[] EXPECTED_FRAGMENTER_MULTILINE = {FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER}; + private S3ProtocolHandler handler; private RequestContext context; @@ -415,6 +417,38 @@ public class S3ProtocolHandlerTest { verifyResolvers(context, EXPECTED_RESOLVER_GPDB_WRITABLE_OFF); } + @Test + public void testTextIdentifierAndSelectOff() { + context.addOption("S3_SELECT", "off"); + context.addOption("IDENTIFIER", "c1"); + context.setOutputFormat(OutputFormat.TEXT); + verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_OFF); + verifyResolvers(context, EXPECTED_RESOLVER_TEXT_OFF); + verifyFragmenters(context, EXPECTED_FRAGMENTER_MULTILINE); + } + + @Test + public void testTextIdentifierAndSelectOn() { + // s3 options should override multiline json fragmenter option + context.addOption("S3_SELECT", "on"); + context.addOption("IDENTIFIER", "c1"); + context.setOutputFormat(OutputFormat.TEXT); + verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_ON); + verifyResolvers(context, EXPECTED_RESOLVER_TEXT_ON); + verifyFragmenters(context, EXPECTED_FRAGMENTER_TEXT_ON); + } + + @Test + public void testTextIdentifierAndSelectAuto() { + // s3 options should override multiline json fragmenter option + context.addOption("S3_SELECT", "auto"); + context.addOption("IDENTIFIER", "c1"); + context.setOutputFormat(OutputFormat.TEXT); + verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_AUTO_NO_BENEFIT); + verifyResolvers(context, EXPECTED_RESOLVER_TEXT_AUTO_NO_BENEFIT); + verifyFragmenters(context, EXPECTED_FRAGMENTER_MULTILINE); + } + private void verifyFragmenters(RequestContext context, String[] expected) { IntStream.range(0, FORMATS.length).forEach(i -> { context.setFormat(FORMATS[i]); diff --git a/server/pxf-service/src/main/resources/pxf-profiles-default.xml b/server/pxf-service/src/main/resources/pxf-profiles-default.xml index 2ebadf82..77255f33 100644 --- a/server/pxf-service/src/main/resources/pxf-profiles-default.xml +++ b/server/pxf-service/src/main/resources/pxf-profiles-default.xml @@ -709,6 +709,7 @@ under the License. <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor> <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver> </plugins> + <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler> </profile> <profile> <name>hdfs:json</name> @@ -723,6 +724,7 @@ under the License. <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor> <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver> </plugins> + <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler> </profile> <profile> <name>s3:json</name> @@ -757,6 +759,7 @@ under the License. <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor> <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver> </plugins> + <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler> <protocol>adl</protocol> </profile> <profile> @@ -772,6 +775,7 @@ under the License. <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor> <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver> </plugins> + <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler> <protocol>wasbs</protocol> </profile> <profile> @@ -787,6 +791,7 @@ under the License. <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor> <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver> </plugins> + <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler> <protocol>gs</protocol> </profile> <profile> @@ -802,6 +807,7 @@ under the License. <accessor>org.greenplum.pxf.plugins.json.JsonAccessor</accessor> <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver> </plugins> + <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler> </profile> <!-- ==================== SEQUENCE FILE PROFILES ==================== --> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
