This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 17f3654 DRILL-1282: Add read and write support for Parquet v2 (#2351)
17f3654 is described below
commit 17f3654919b8429b12e321fb4ee837b7a52e06f1
Author: James Turton <[email protected]>
AuthorDate: Tue Nov 23 17:30:36 2021 +0200
DRILL-1282: Add read and write support for Parquet v2 (#2351)
* Add an option for Parquet v2 writing.
* Finish adding Parquet v2 write support.
* Fix LGTM error inadvertently merged in #2366.
* Add parquet format config options for version and various other
parameters.
* Fix persistence and application of config opts.
* Fix TestFormatPluginOptionExtractor failure.
* Add a test of parquet format config writer options.
* Fix checkstyle violations.
* Address review comments.
* Upgrade TestParquetWriter to ClusterTest to make a home for parquet
format config test.
* Change Parquet version strings to "v1", "v2".
* Convert comment containing an @link to Javadoc.
---
.../org/apache/drill/categories/ParquetTest.java | 4 +
.../java/org/apache/drill/exec/ExecConstants.java | 11 +
.../drill/exec/server/options/OptionValue.java | 10 +
.../exec/server/options/SystemOptionManager.java | 1 +
.../drill/exec/server/rest/StorageResources.java | 2 +-
.../exec/store/parquet/ParquetFormatConfig.java | 129 +++++---
.../exec/store/parquet/ParquetFormatPlugin.java | 103 ++++--
.../drill/exec/store/parquet/ParquetGroupScan.java | 4 +-
.../exec/store/parquet/ParquetReaderConfig.java | 15 +-
.../exec/store/parquet/ParquetRecordWriter.java | 21 +-
.../java-exec/src/main/resources/drill-module.conf | 1 +
.../physical/impl/writer/TestParquetWriter.java | 355 ++++++++++++---------
.../store/dfs/TestFormatPluginOptionExtractor.java | 9 +-
.../store/parquet/TestParquetReaderConfig.java | 31 +-
.../exec/store/parquet/TestVarlenDecimal.java | 9 +-
15 files changed, 452 insertions(+), 253 deletions(-)
diff --git a/common/src/test/java/org/apache/drill/categories/ParquetTest.java
b/common/src/test/java/org/apache/drill/categories/ParquetTest.java
index aafaeb4..b7d7668 100644
--- a/common/src/test/java/org/apache/drill/categories/ParquetTest.java
+++ b/common/src/test/java/org/apache/drill/categories/ParquetTest.java
@@ -21,4 +21,8 @@ package org.apache.drill.categories;
* This is a category used to mark unit tests that test Drill Parquet support.
*/
public interface ParquetTest {
+ /**
+ * tag for JUnit5
+ */
+ String TAG = "parquet-test";
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 16d21b5..ef5cd3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec;
+import java.util.Arrays;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.rpc.user.InboundImpersonationManager;
@@ -36,6 +37,7 @@ import
org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidat
import
org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator;
import org.apache.drill.exec.server.options.TypeValidators.RangeLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.StringValidator;
+import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.vector.ValueVector;
@@ -375,6 +377,15 @@ public final class ExecConstants {
public static final OptionValidator
PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS_VALIDATOR = new
EnumeratedStringValidator(PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
new OptionDescription("Parquet writer logical type for decimal;
supported types \'fixed_len_byte_array\' and \'binary\'"),
"fixed_len_byte_array", "binary");
+ public static final String PARQUET_WRITER_FORMAT_VERSION =
"store.parquet.writer.format_version";
+ public static final OptionValidator PARQUET_WRITER_FORMAT_VERSION_VALIDATOR
= new EnumeratedStringValidator(
+ PARQUET_WRITER_FORMAT_VERSION,
+ new OptionDescription(
+ "Parquet format version used for storing Parquet output. Allowed
values:" +
+ Arrays.toString(ParquetFormatPlugin.PARQUET_FORMAT_VERSIONS)
+ ),
+ ParquetFormatPlugin.PARQUET_FORMAT_VERSIONS
+ );
// TODO - The below two options don't seem to be used in the Drill code base
@Deprecated // TODO: DRILL-6527
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
index e74f46a..b00cfce 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -240,6 +240,16 @@ public class OptionValue implements
Comparable<OptionValue> {
}
}
+ /**
+ * Gets the value of this option if it exists at a scope at least as narrow
as the given scope.
+ * @param minScope scope which the option's scope should be narrower than
+ * @return null if the option does not exist at a scope at least as narrow
as minScope
+ */
+ @JsonIgnore
+ public Object getValueMinScope(OptionScope minScope) {
+ return scope.compareTo(minScope) >= 0 ? getValue() : null;
+ }
+
@JsonIgnore
public OptionScope getScope() {
return scope;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index fa3c3f5..0122fe3 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -172,6 +172,7 @@ public class SystemOptionManager extends BaseOptionManager
implements AutoClosea
new
OptionDefinition(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR),
new
OptionDefinition(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS_VALIDATOR),
new
OptionDefinition(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS_VALIDATOR),
+ new
OptionDefinition(ExecConstants.PARQUET_WRITER_FORMAT_VERSION_VALIDATOR),
new
OptionDefinition(ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR),
new
OptionDefinition(ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR),
new
OptionDefinition(ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR),
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index a6c958c..dcba516 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -271,7 +271,7 @@ public class StorageResources {
} catch (PluginException e) {
logger.error("Error while saving plugin", e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(message("Error while saving plugin: ", e.getMessage()))
+ .entity(message("Error while saving plugin: %s", e.getMessage()))
.build();
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index 4370b92..4367dc7 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -18,81 +18,110 @@
package org.apache.drill.exec.store.parquet;
import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.Objects;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.logical.FormatPluginConfig;
import com.fasterxml.jackson.annotation.JsonTypeName;
+@EqualsAndHashCode
@JsonTypeName("parquet") @JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class ParquetFormatConfig implements FormatPluginConfig {
- private final boolean autoCorrectCorruptDates;
- private final boolean enableStringsSignedMinMax;
-
- public ParquetFormatConfig() {
- this(true, false);
- }
-
- @JsonCreator
- public ParquetFormatConfig(@JsonProperty("autoCorrectCorruptDates") Boolean
autoCorrectCorruptDates,
- @JsonProperty("enableStringsSignedMinMax") boolean
enableStringsSignedMinMax) {
- this.autoCorrectCorruptDates = autoCorrectCorruptDates == null ? true :
autoCorrectCorruptDates;
- this.enableStringsSignedMinMax = enableStringsSignedMinMax;
- }
-
/**
- * @return true if auto correction of corrupt dates is enabled, false
otherwise
+ * Until DRILL-4203 was resolved, Drill could write non-standard dates into
+ * parquet files. This issue is related to all drill releases where {@link
+ *
org.apache.drill.exec.store.parquet.ParquetRecordWriter#WRITER_VERSION_PROPERTY}
+ * < {@link
org.apache.drill.exec.store.parquet.ParquetReaderUtility#DRILL_WRITER_VERSION_STD_DATE_FORMAT}.
+
+ * The values have been read correctly by Drill, but external tools like
+ * Spark reading the files will see corrupted values for all dates that
+ * have been written by Drill. To maintain compatibility with old files,
+ * the parquet reader code has been given the ability to check for the
+ * old format and automatically shift the corrupted values into corrected
+ * ones automatically.
*/
- @JsonIgnore
- public boolean areCorruptDatesAutoCorrected() {
- return autoCorrectCorruptDates;
- }
+ @Getter private final boolean autoCorrectCorruptDates;
/**
- * Parquet statistics for UTF-8 data for files created prior to 1.9.1
parquet library version was stored incorrectly.
- * If user exactly knows that data in binary columns is in ASCII (not
UTF-8), turning this property to 'true'
- * enables statistics usage for varchar and decimal columns.
- *
- * Can be overridden for individual tables using
- * @link
org.apache.drill.exec.ExecConstants#PARQUET_READER_STRINGS_SIGNED_MIN_MAX}
session option.
+ * Parquet statistics for UTF-8 data in files created prior to 1.9.1 parquet
+ * library version were stored incorrectly. If the user exactly knows that
+ * data in binary columns is in ASCII (not UTF-8), turning this property to
+ * 'true' enables statistics usage for varchar and decimal columns.
*
- * @return true if string signed min max enabled, false otherwise
+ * {@link
org.apache.drill.exec.ExecConstants#PARQUET_READER_STRINGS_SIGNED_MIN_MAX}
*/
- @JsonIgnore
- public boolean isStringsSignedMinMaxEnabled() {
- return enableStringsSignedMinMax;
- }
+ @Getter private final boolean enableStringsSignedMinMax;
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- ParquetFormatConfig that = (ParquetFormatConfig) o;
- return Objects.equals(autoCorrectCorruptDates,
that.autoCorrectCorruptDates) &&
- Objects.equals(enableStringsSignedMinMax,
that.enableStringsSignedMinMax);
+ // {@link org.apache.drill.exec.ExecConstants#PARQUET_BLOCK_SIZE}
+ @Getter private final Integer blockSize;
+
+ // {@link org.apache.drill.exec.ExecConstants#PARQUET_PAGE_SIZE}
+ @Getter private final Integer pageSize;
+
+ // {@link
org.apache.drill.exec.ExecConstants#PARQUET_WRITER_USE_SINGLE_FS_BLOCK}
+ @Getter private final Boolean useSingleFSBlock;
+
+ // {@link
org.apache.drill.exec.ExecConstants#PARQUET_WRITER_COMPRESSION_TYPE}
+ @Getter private final String writerCompressionType;
+
+ // {@link
org.apache.drill.exec.ExecConstants#PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS}
+ @Getter private final String writerLogicalTypeForDecimals;
+
+ // {@link
org.apache.drill.exec.ExecConstants#PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS}
+ @Getter private final Boolean writerUsePrimitivesForDecimals;
+
+ // {@link org.apache.drill.exec.ExecConstants#PARQUET_WRITER_FORMAT_VERSION}
+ @Getter private final String writerFormatVersion;
+
+ public ParquetFormatConfig() {
+ // config opts which are also system opts must default to null so as not
+ // to override system opts.
+ this(true, false, null, null, null, null, null, null, null);
}
- @Override
- public int hashCode() {
- return Objects.hash(autoCorrectCorruptDates, enableStringsSignedMinMax);
+ @JsonCreator
+ @Builder
+ public ParquetFormatConfig(
+ @JsonProperty("autoCorrectCorruptDates") Boolean autoCorrectCorruptDates,
+ @JsonProperty("enableStringsSignedMinMax") boolean
enableStringsSignedMinMax,
+ @JsonProperty("blockSize") Integer blockSize,
+ @JsonProperty("pageSize") Integer pageSize,
+ @JsonProperty("useSingleFSBlock") Boolean useSingleFSBlock,
+ @JsonProperty("writerCompressionType") String writerCompressionType,
+ @JsonProperty("writerLogicalTypeForDecimals") String
writerLogicalTypeForDecimals,
+ @JsonProperty("writerUsePrimitivesForDecimals") Boolean
writerUsePrimitivesForDecimals,
+ @JsonProperty("writerFormatVersion") String writerFormatVersion
+ ) {
+ this.autoCorrectCorruptDates = autoCorrectCorruptDates == null ? true :
autoCorrectCorruptDates;
+ this.enableStringsSignedMinMax = enableStringsSignedMinMax;
+ this.blockSize = blockSize;
+ this.pageSize = pageSize;
+ this.useSingleFSBlock = useSingleFSBlock;
+ this.writerCompressionType = writerCompressionType;
+ this.writerLogicalTypeForDecimals = writerLogicalTypeForDecimals;
+ this.writerUsePrimitivesForDecimals = writerUsePrimitivesForDecimals;
+ this.writerFormatVersion = writerFormatVersion;
}
@Override
public String toString() {
return new PlanStringBuilder(this)
- .field("autoCorrectCorruptDates", autoCorrectCorruptDates)
- .field("enableStringsSignedMinMax", enableStringsSignedMinMax)
- .toString();
+ .field("autoCorrectCorruptDates", autoCorrectCorruptDates)
+ .field("enableStringsSignedMinMax", enableStringsSignedMinMax)
+ .field("blockSize", blockSize)
+ .field("pageSize", pageSize)
+ .field("useSingleFSBlock", useSingleFSBlock)
+ .field("writerCompressionType", writerCompressionType)
+ .field("writerLogicalTypeForDecimals", writerLogicalTypeForDecimals)
+ .field("writerUsePrimitivesForDecimals", writerUsePrimitivesForDecimals)
+ .field("writerFormatVersion", writerFormatVersion)
+ .toString();
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 9129fc8..b1a614f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -29,6 +29,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -49,6 +51,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -78,6 +81,9 @@ import org.slf4j.LoggerFactory;
public class ParquetFormatPlugin implements FormatPlugin {
+ /** {@link org.apache.parquet.column.ParquetProperties.WriterVersion} */
+ public static final String[] PARQUET_FORMAT_VERSIONS = { "v1", "v2" };
+
private static final Logger logger =
LoggerFactory.getLogger(ParquetFormatPlugin.class);
public static final ParquetMetadataConverter parquetMetadataConverter = new
ParquetMetadataConverter();
@@ -141,34 +147,87 @@ public class ParquetFormatPlugin implements FormatPlugin {
}
public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter
writer) throws IOException, OutOfMemoryException {
- Map<String, String> options = new HashMap<>();
+ Map<String, String> writerOpts = new HashMap<>();
+ OptionManager contextOpts = context.getOptions();
- options.put("location", writer.getLocation());
+ writerOpts.put("location", writer.getLocation());
FragmentHandle handle = context.getHandle();
String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(),
handle.getMinorFragmentId());
- options.put("prefix", fragmentId);
-
- options.put(ExecConstants.PARQUET_BLOCK_SIZE,
context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
- options.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK,
-
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).bool_val.toString());
- options.put(ExecConstants.PARQUET_PAGE_SIZE,
context.getOptions().getOption(ExecConstants.PARQUET_PAGE_SIZE).num_val.toString());
- options.put(ExecConstants.PARQUET_DICT_PAGE_SIZE,
context.getOptions().getOption(ExecConstants.PARQUET_DICT_PAGE_SIZE).num_val.toString());
-
- options.put(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
-
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).string_val);
-
- options.put(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
-
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING).bool_val.toString());
-
- options.put(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
-
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS).string_val);
-
- options.put(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
-
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS).bool_val.toString());
+ writerOpts.put("prefix", fragmentId);
+
+ // Many options which follow may be set as Drill config options or in the
parquet format
+ // plugin config. If there is a Drill option set at session scope or
narrower it takes precendence.
+ OptionValue.OptionScope minScope = OptionValue.OptionScope.SESSION;
+
+ writerOpts.put(ExecConstants.PARQUET_BLOCK_SIZE,
+ ObjectUtils.firstNonNull(
+
contextOpts.getOption(ExecConstants.PARQUET_BLOCK_SIZE).getValueMinScope(minScope),
+ config.getBlockSize(),
+ contextOpts.getInt(ExecConstants.PARQUET_BLOCK_SIZE)
+ ).toString()
+ );
+
+ writerOpts.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK,
+ ObjectUtils.firstNonNull(
+
contextOpts.getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).getValueMinScope(minScope),
+ config.getUseSingleFSBlock(),
+
contextOpts.getBoolean(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK)
+ ).toString()
+ );
+
+ writerOpts.put(ExecConstants.PARQUET_PAGE_SIZE,
+ ObjectUtils.firstNonNull(
+
contextOpts.getOption(ExecConstants.PARQUET_PAGE_SIZE).getValueMinScope(minScope),
+ config.getPageSize(),
+ contextOpts.getInt(ExecConstants.PARQUET_PAGE_SIZE)
+ ).toString()
+ );
+
+ // "internal use" so not settable in format config
+ writerOpts.put(ExecConstants.PARQUET_DICT_PAGE_SIZE,
+
contextOpts.getOption(ExecConstants.PARQUET_DICT_PAGE_SIZE).num_val.toString()
+ );
+
+ // "internal use" so not settable in format config
+ writerOpts.put(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
+
contextOpts.getOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING).bool_val.toString()
+ );
+
+ writerOpts.put(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
+ ObjectUtils.firstNonNull(
+
contextOpts.getOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).getValueMinScope(minScope),
+ config.getWriterCompressionType(),
+ contextOpts.getString(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE)
+ ).toString()
+ );
+
+ writerOpts.put(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
+ ObjectUtils.firstNonNull(
+
contextOpts.getOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS).getValueMinScope(minScope),
+ config.getWriterLogicalTypeForDecimals(),
+
contextOpts.getString(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS)
+ ).toString()
+ );
+
+
writerOpts.put(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
+ ObjectUtils.firstNonNull(
+
contextOpts.getOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS).getValueMinScope(minScope),
+ config.getWriterUsePrimitivesForDecimals(),
+
contextOpts.getBoolean(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS)
+ ).toString()
+ );
+
+ writerOpts.put(ExecConstants.PARQUET_WRITER_FORMAT_VERSION,
+ ObjectUtils.firstNonNull(
+
contextOpts.getOption(ExecConstants.PARQUET_WRITER_FORMAT_VERSION).getValueMinScope(minScope),
+ config.getWriterFormatVersion(),
+ contextOpts.getString(ExecConstants.PARQUET_WRITER_FORMAT_VERSION)
+ ).toString()
+ );
RecordWriter recordWriter = new ParquetRecordWriter(context, writer);
- recordWriter.init(options);
+ recordWriter.init(writerOpts);
return recordWriter;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 4d5e178..1af28d3 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -99,7 +99,7 @@ public class ParquetGroupScan extends
AbstractParquetGroupScan {
.withCacheFileRoot(cacheFileRoot)
.withReaderConfig(readerConfig)
.withFileSystem(fs)
-
.withCorrectCorruptedDates(this.formatConfig.areCorruptDatesAutoCorrected())
+
.withCorrectCorruptedDates(this.formatConfig.isAutoCorrectCorruptDates())
.withSchema(schema)
.build();
@@ -146,7 +146,7 @@ public class ParquetGroupScan extends
AbstractParquetGroupScan {
.withSelection(selection)
.withReaderConfig(readerConfig)
.withFileSystem(fs)
- .withCorrectCorruptedDates(formatConfig.areCorruptDatesAutoCorrected())
+ .withCorrectCorruptedDates(formatConfig.isAutoCorrectCorruptDates())
.build();
this.usedMetadataCache = metadataProvider.isUsedMetadataCache();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
index 3b668c7..c7a3db5 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ParquetReadOptions;
@@ -175,8 +176,8 @@ public class ParquetReaderConfig {
// first assign configuration values from format config
if (formatConfig != null) {
- readerConfig.autoCorrectCorruptedDates =
formatConfig.areCorruptDatesAutoCorrected();
- readerConfig.enableStringsSignedMinMax =
formatConfig.isStringsSignedMinMaxEnabled();
+ readerConfig.autoCorrectCorruptedDates =
formatConfig.isAutoCorrectCorruptDates();
+ readerConfig.enableStringsSignedMinMax =
formatConfig.isEnableStringsSignedMinMax();
}
// then assign configuration values from Hadoop configuration
@@ -186,11 +187,13 @@ public class ParquetReaderConfig {
readerConfig.enableTimeReadCounter =
conf.getBoolean(ENABLE_TIME_READ_COUNTER, readerConfig.enableTimeReadCounter);
}
- // last assign values from session options, session options have higher
priority than other configurations
+ // last assign values from session or query scoped options which have
higher priority than other configurations
if (options != null) {
- String option =
options.getOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR);
- if (!option.isEmpty()) {
- readerConfig.enableStringsSignedMinMax = Boolean.valueOf(option);
+ String optVal = (String) options.getOption(
+ ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX
+ ).getValueMinScope(OptionValue.OptionScope.SESSION);
+ if (optVal != null && !optVal.isEmpty()) {
+ readerConfig.enableStringsSignedMinMax = Boolean.valueOf(optVal);
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 53a228f..a4b68da 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -62,7 +62,10 @@ import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
+import org.apache.parquet.column.values.factory.DefaultV2ValuesWriterFactory;
+import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -206,6 +209,9 @@ public class ParquetRecordWriter extends
ParquetOutputRecordWriter {
enableDictionary =
Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
useSingleFSBlock =
Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK));
usePrimitiveTypesForDecimals =
Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS));
+ writerVersion = WriterVersion.fromString(
+ writerOptions.get(ExecConstants.PARQUET_WRITER_FORMAT_VERSION)
+ );
if (useSingleFSBlock) {
// Round up blockSize to multiple of 64K.
@@ -263,20 +269,29 @@ public class ParquetRecordWriter extends
ParquetOutputRecordWriter {
// We don't want this number to be too small either. Ideally, slightly
bigger than the page size,
// but not bigger than the block buffer
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize +
pageSize / 10, initialBlockBufferSize));
+ ValuesWriterFactory valWriterFactory = writerVersion ==
WriterVersion.PARQUET_1_0
+ ? new DefaultV1ValuesWriterFactory()
+ : new DefaultV2ValuesWriterFactory();
+
ParquetProperties parquetProperties = ParquetProperties.builder()
.withPageSize(pageSize)
.withDictionaryEncoding(enableDictionary)
.withDictionaryPageSize(initialPageBufferSize)
- .withWriterVersion(writerVersion)
.withAllocator(new ParquetDirectByteBufferAllocator(oContext))
- .withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
+ .withValuesWriterFactory(valWriterFactory)
+ .withWriterVersion(writerVersion)
.build();
+
// TODO: Replace ParquetColumnChunkPageWriteStore with
ColumnChunkPageWriteStore from parquet library
// once DRILL-7906 (PARQUET-1006) will be resolved
pageStore = new
ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema,
parquetProperties.getInitialSlabSize(), pageSize,
parquetProperties.getAllocator(),
parquetProperties.getColumnIndexTruncateLength(),
parquetProperties.getPageWriteChecksumEnabled());
- store = new ColumnWriteStoreV1(pageStore, parquetProperties);
+
+ store = writerVersion == WriterVersion.PARQUET_1_0
+ ? new ColumnWriteStoreV1(schema, pageStore, parquetProperties)
+ : new ColumnWriteStoreV2(schema, pageStore, parquetProperties);
+
MessageColumnIO columnIO = new
ColumnIOFactory(false).getColumnIO(this.schema);
consumer = columnIO.getRecordWriter(store);
setUp(schema, consumer);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf
b/exec/java-exec/src/main/resources/drill-module.conf
index a8b1ed3..9160857 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -726,6 +726,7 @@ drill.exec.options: {
store.parquet.writer.use_primitive_types_for_decimals: true,
store.parquet.writer.logical_type_for_decimals: "fixed_len_byte_array",
store.parquet.writer.use_single_fs_block: false,
+ store.parquet.writer.format_version: "v1",
store.parquet.flat.reader.bulk: true,
store.parquet.flat.batch.num_records: 32767,
store.parquet.complex.batch.num_records: 4000,
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 093ddd8..cc6c2d1 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -21,14 +21,18 @@ import org.apache.calcite.util.Pair;
import org.apache.drill.categories.ParquetTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.fn.interp.TestConstantFolding;
import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
import org.apache.drill.exec.util.JsonStringArrayList;
import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.TestBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -37,9 +41,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.joda.time.Period;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -74,11 +79,11 @@ import static
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
@RunWith(Parameterized.class)
@Category({SlowTest.class, ParquetTest.class})
-public class TestParquetWriter extends BaseTestQuery {
+public class TestParquetWriter extends ClusterTest {
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { {100} });
@@ -89,8 +94,6 @@ public class TestParquetWriter extends BaseTestQuery {
dirTestWatcher.copyResourceToRoot(Paths.get("parquet",
"int96_dict_change"));
}
- private static FileSystem fs;
-
// Map storing a convenient name as well as the cast type necessary
// to produce it casting from a varchar
private static final Map<String, String> allTypes = new HashMap<>();
@@ -137,14 +140,14 @@ public class TestParquetWriter extends BaseTestQuery {
public int repeat = 1;
@BeforeClass
- public static void initFs() throws Exception {
- fs = getLocalFileSystem();
- alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
+ public static void setUp() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+ client.alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
}
@AfterClass
public static void disableDecimalDataType() {
- resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+ client.resetSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
}
@Test
@@ -187,8 +190,7 @@ public class TestParquetWriter extends BaseTestQuery {
new TestConstantFolding.SmallFileCreator(pathDir)
.setRecord(sb.toString()).createFiles(1, 1, "json");
- test("use dfs.tmp");
- test("create table WIDE_PARQUET_TABLE_TestParquetWriter_testLargeFooter as
select * from dfs.`%s/smallfile/smallfile.json`", path);
+ run("create table
dfs.tmp.WIDE_PARQUET_TABLE_TestParquetWriter_testLargeFooter as select * from
dfs.`%s/smallfile/smallfile.json`", path);
testBuilder()
.sqlQuery("select * from
dfs.tmp.WIDE_PARQUET_TABLE_TestParquetWriter_testLargeFooter")
.unOrdered()
@@ -204,26 +206,26 @@ public class TestParquetWriter extends BaseTestQuery {
try {
// read all of the types with the complex reader
- alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+ client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json");
} finally {
- resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+ client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
}
}
@Test
public void testAllScalarTypesDictionary() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
true);
+
client.alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
true);
/// read once with the flat reader
runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json");
// read all of the types with the complex reader
- alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+ client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json");
} finally {
- resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
-
resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
+ client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
+
client.resetSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
}
}
@@ -238,10 +240,10 @@ public class TestParquetWriter extends BaseTestQuery {
String selection = "type";
String inputTable = "cp.`donuts.json`";
try {
- alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
true);
+
client.alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
true);
runTestAndValidate(selection, selection, inputTable, "donuts_json");
} finally {
-
resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
+
client.resetSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
}
}
@@ -279,7 +281,7 @@ public class TestParquetWriter extends BaseTestQuery {
@Test
public void testTPCHReadWrite1_date_convertedType() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
false);
+
client.alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
false);
String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER,
L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
"L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as
L_COMMITDATE, cast(L_RECEIPTDATE as DATE) AS L_RECEIPTDATE, L_SHIPINSTRUCT,
L_SHIPMODE, L_COMMENT";
String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY,
L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
@@ -287,7 +289,7 @@ public class TestParquetWriter extends BaseTestQuery {
String inputTable = "cp.`tpch/lineitem.parquet`";
runTestAndValidate(selection, validationSelection, inputTable,
"lineitem_parquet_converted");
} finally {
-
resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
+
client.resetSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
}
}
@@ -336,24 +338,24 @@ public class TestParquetWriter extends BaseTestQuery {
@Test
public void testTPCHReadWriteNoDictUncompressed() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
false);
- alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "none");
+
client.alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
false);
+ client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
"none");
String inputTable = "cp.`tpch/supplier.parquet`";
runTestAndValidate("*", "*", inputTable,
"supplier_parquet_no_dict_uncompressed");
} finally {
-
resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
- resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+
client.resetSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
+ client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
}
}
@Test
public void testTPCHReadWriteDictGzip() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "gzip");
+ client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
"gzip");
String inputTable = "cp.`tpch/supplier.parquet`";
runTestAndValidate("*", "*", inputTable, "supplier_parquet_dict_gzip");
} finally {
- resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+ client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
}
}
@@ -409,24 +411,24 @@ public class TestParquetWriter extends BaseTestQuery {
// the old and new readers.
try {
- alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+ client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
runTestAndValidate(selection, validateSelection, inputTable,
"parquet_decimal");
- alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
+ client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
runTestAndValidate(selection, validateSelection, inputTable,
"parquet_decimal");
} finally {
- resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+ client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
}
}
@Test
public void testMulipleRowGroups() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_BLOCK_SIZE, 1024*1024);
+ client.alterSession(ExecConstants.PARQUET_BLOCK_SIZE, 1024*1024);
String selection = "mi";
String inputTable = "cp.`customer.json`";
runTestAndValidate(selection, selection, inputTable,
"foodmart_customer_parquet");
} finally {
- resetSessionOption(ExecConstants.PARQUET_BLOCK_SIZE);
+ client.resetSession(ExecConstants.PARQUET_BLOCK_SIZE);
}
}
@@ -462,15 +464,15 @@ public class TestParquetWriter extends BaseTestQuery {
String queryFromWriteOut = "select * from " + outputFile;
try {
- test("use dfs.tmp");
- test(ctasStmt);
+ run("use dfs.tmp");
+ run(ctasStmt);
testBuilder()
.ordered()
.sqlQuery(queryFromWriteOut)
.sqlBaselineQuery(query)
.build().run();
} finally {
- deleteTableIfExists(outputFile);
+ run("drop table if exists dfs.tmp.%s", outputFile);
}
}
@@ -486,7 +488,7 @@ public class TestParquetWriter extends BaseTestQuery {
.optionSettingQueriesForBaseline("alter system set
`store.parquet.use_new_reader` = true")
.build().run();
} finally {
- resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+ client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
}
}
@@ -505,7 +507,7 @@ public class TestParquetWriter extends BaseTestQuery {
"alter system set `store.parquet.use_new_reader` = true")
.build().run();
} finally {
- resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+ client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
}
}
@@ -595,8 +597,7 @@ public class TestParquetWriter extends BaseTestQuery {
String outputTable = "decimal_test";
try {
- test("use dfs.tmp; " +
- "create table %s as select " +
+ run("create table dfs.tmp.%s as select " +
"cast('1.2' as decimal(38, 2)) col1, cast('1.2' as decimal(28, 2))
col2 " +
"from cp.`employee.json` limit 1", outputTable);
@@ -604,12 +605,12 @@ public class TestParquetWriter extends BaseTestQuery {
testBuilder()
.unOrdered()
- .sqlQuery("select col1, col2 from %s ", outputTable)
+ .sqlQuery("select col1, col2 from dfs.tmp.%s ", outputTable)
.baselineColumns("col1", "col2")
.baselineValues(result, result)
.go();
} finally {
- deleteTableIfExists(outputTable);
+ run("drop table if exists dfs.tmp.%s", outputTable);
}
}
@@ -619,7 +620,7 @@ public class TestParquetWriter extends BaseTestQuery {
final String newTblName = "testTableOutputSchema";
try {
- test("CREATE TABLE dfs.tmp.%s(id, name, bday) AS SELECT " +
+ run("CREATE TABLE dfs.tmp.%s(id, name, bday) AS SELECT " +
"cast(`employee_id` as integer), " +
"cast(`full_name` as varchar(100)), " +
"cast(`birth_date` as date) " +
@@ -632,7 +633,7 @@ public class TestParquetWriter extends BaseTestQuery {
.baselineValues(1, "Sheri Nowmer", LocalDate.parse("1961-08-26"))
.go();
} finally {
- deleteTableIfExists(newTblName);
+ run("drop table if exists dfs.tmp.%s", newTblName);
}
}
@@ -642,11 +643,11 @@ public class TestParquetWriter extends BaseTestQuery {
*/
@Test
public void testCTASWithIntervalTypes() throws Exception { // TODO:
investigate NPE errors during the test execution
- test("use dfs.tmp");
+ run("use dfs.tmp");
String tableName = "drill_1980_t1";
// test required interval day type
- test("create table %s as " +
+ run("create table %s as " +
"select " +
"interval '10 20:30:40.123' day to second col1, " +
"interval '-1000000000 20:12:23.999' day(10) to second col2 " +
@@ -659,7 +660,7 @@ public class TestParquetWriter extends BaseTestQuery {
tableName = "drill_1980_2";
// test required interval year type
- test("create table %s as " +
+ run("create table %s as " +
"select " +
"interval '10-2' year to month col1, " +
"interval '-100-8' year(3) to month col2 " +
@@ -671,7 +672,7 @@ public class TestParquetWriter extends BaseTestQuery {
testParquetReaderHelper(tableName, row1Col1, row1Col2, row1Col1, row1Col2);
// test nullable interval year type
tableName = "drill_1980_t3";
- test("create table %s as " +
+ run("create table %s as " +
"select " +
"cast (intervalyear_col as interval year) col1," +
"cast(intervalyear_col as interval year) + interval '2' year col2 " +
@@ -686,7 +687,7 @@ public class TestParquetWriter extends BaseTestQuery {
// test nullable interval day type
tableName = "drill_1980_t4";
- test("create table %s as " +
+ run("create table %s as " +
"select " +
"cast(intervalday_col as interval day) col1, " +
"cast(intervalday_col as interval day) + interval '1' day col2 " +
@@ -727,25 +728,14 @@ public class TestParquetWriter extends BaseTestQuery {
.go();
}
- private static void deleteTableIfExists(String tableName) {
- try {
- Path path = new
Path(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName);
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- } catch (Exception e) {
- // ignore exceptions.
- }
- }
-
public void runTestAndValidate(String selection, String validationSelection,
String inputTable, String outputFile) throws Exception {
try {
- deleteTableIfExists(outputFile);
+ run("drop table if exists dfs.tmp.%s", outputFile);
final String query = String.format("SELECT %s FROM %s", selection,
inputTable);
- test("use dfs.tmp");
- test("CREATE TABLE %s AS %s", outputFile, query);
+ run("use dfs.tmp");
+ run("CREATE TABLE %s AS %s", outputFile, query);
testBuilder()
.unOrdered()
.sqlQuery(query)
@@ -763,7 +753,7 @@ public class TestParquetWriter extends BaseTestQuery {
assertEquals(DrillVersionInfo.getVersion(), version);
}
} finally {
- deleteTableIfExists(outputFile);
+ run("drop table if exists dfs.tmp.%s", outputFile);
}
}
@@ -776,10 +766,10 @@ public class TestParquetWriter extends BaseTestQuery {
public void testImpalaParquetInt96() throws Exception {
compareParquetReadersColumnar("field_impala_ts",
"cp.`parquet/int96_impala_1.parquet`");
try {
- alterSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP, true);
+ client.alterSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP,
true);
compareParquetReadersColumnar("field_impala_ts",
"cp.`parquet/int96_impala_1.parquet`");
} finally {
- resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+ client.resetSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
}
}
@@ -806,7 +796,7 @@ public class TestParquetWriter extends BaseTestQuery {
.baselineValues(convertToLocalDateTime("1970-01-01 00:00:01.000"))
.build().run();
} finally {
- resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+ client.resetSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
}
}
@@ -822,7 +812,7 @@ public class TestParquetWriter extends BaseTestQuery {
.baselineValues(convertToLocalDateTime("2017-12-06 16:38:43.988"))
.build().run();
} finally {
- resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+ client.resetSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
}
}
@@ -841,12 +831,12 @@ public class TestParquetWriter extends BaseTestQuery {
@Test
public void testImpalaParquetTimestampInt96AsTimeStamp() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
+ client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
compareParquetInt96Converters("field_impala_ts",
"cp.`parquet/int96_impala_1.parquet`");
- alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+ client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
compareParquetInt96Converters("field_impala_ts",
"cp.`parquet/int96_impala_1.parquet`");
} finally {
- resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+ client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
}
}
@@ -897,7 +887,7 @@ public class TestParquetWriter extends BaseTestQuery {
try (FileWriter fw = new FileWriter(input2)) {
fw.append("{\"b\":\"foo\"}\n");
}
- test("select * from " + "dfs.`" + dir.getAbsolutePath() + "`");
+ run("select * from " + "dfs.`" + dir.getAbsolutePath() + "`");
runTestAndValidate("*", "*", "dfs.`" + dir.getAbsolutePath() + "`",
"schema_change_parquet");
}
@@ -959,7 +949,18 @@ public class TestParquetWriter extends BaseTestQuery {
.build()
.run();
} finally {
- resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+ client.resetSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+ }
+ }
+
+ @Test
+ public void testTPCHReadWriteFormatV2() throws Exception {
+ try {
+ client.alterSession(ExecConstants.PARQUET_WRITER_FORMAT_VERSION, "v2");
+ String inputTable = "cp.`tpch/supplier.parquet`";
+ runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_v2");
+ } finally {
+ client.resetSession(ExecConstants.PARQUET_WRITER_FORMAT_VERSION);
}
}
@@ -979,22 +980,22 @@ public class TestParquetWriter extends BaseTestQuery {
@Test
public void testTPCHReadWriteSnappy() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "snappy");
+ client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
"snappy");
String inputTable = "cp.`tpch/supplier.parquet`";
runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_snappy");
} finally {
- resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+ client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
}
}
@Test
public void testTPCHReadWriteGzip() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "gzip");
+ client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
"gzip");
String inputTable = "cp.`supplier_gzip.parquet`";
runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_gzip");
} finally {
- resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+ client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
}
}
@@ -1006,56 +1007,56 @@ public class TestParquetWriter extends BaseTestQuery {
@EnabledIfSystemProperty(named = "os.arch", matches = "x86_64") // reported
for OS X on AMD64
public void testTPCHReadWriteBrotli() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "brotli");
+ client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
"brotli");
// exercise the new Parquet record reader with this parquet-mr-backed
codec
- alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+ client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
String inputTable = "cp.`supplier_brotli.parquet`";
runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_brotli");
} finally {
- resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
- resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+ client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+ client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
}
}
@Test
public void testTPCHReadWriteLz4() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "lz4");
+ client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
"lz4");
// exercise the async Parquet column reader with this
aircompressor-backed codec
- alterSession(ExecConstants.PARQUET_COLUMNREADER_ASYNC, true);
+ client.alterSession(ExecConstants.PARQUET_COLUMNREADER_ASYNC, true);
String inputTable = "cp.`supplier_lz4.parquet`";
runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_lz4");
} finally {
- resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
- resetSessionOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC);
+ client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+ client.resetSession(ExecConstants.PARQUET_COLUMNREADER_ASYNC);
}
}
@Test
public void testTPCHReadWriteLzo() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "lzo");
+ client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
"lzo");
// exercise the async Parquet page reader with this aircompressor-backed
codec
- alterSession(ExecConstants.PARQUET_PAGEREADER_ASYNC, true);
+ client.alterSession(ExecConstants.PARQUET_PAGEREADER_ASYNC, true);
String inputTable = "cp.`supplier_lzo.parquet`";
runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_lzo");
} finally {
- resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
- resetSessionOption(ExecConstants.PARQUET_PAGEREADER_ASYNC);
+ client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+ client.resetSession(ExecConstants.PARQUET_PAGEREADER_ASYNC);
}
}
@Test
public void testTPCHReadWriteZstd() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "zstd");
+ client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
"zstd");
// exercise the new Parquet record reader with this aircompressor-backed
codec
- alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+ client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
String inputTable = "cp.`supplier_zstd.parquet`";
runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_zstd");
} finally {
- resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
- resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+ client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+ client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
}
}
@@ -1077,7 +1078,7 @@ public class TestParquetWriter extends BaseTestQuery {
.build()
.run();
} finally {
- resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+ client.resetSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
}
}
@@ -1085,8 +1086,8 @@ public class TestParquetWriter extends BaseTestQuery {
public void testWriteDecimalIntBigIntFixedLen() throws Exception {
String tableName = "decimalIntBigIntFixedLen";
try {
- alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
FIXED_LEN_BYTE_ARRAY.name());
- test(
+
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
FIXED_LEN_BYTE_ARRAY.name());
+ run(
"create table dfs.tmp.%s as\n" +
"select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
"cast('123456.789123456789' as decimal(18, 12)) as
decBigInt,\n" +
@@ -1106,9 +1107,9 @@ public class TestParquetWriter extends BaseTestQuery {
new BigDecimal("123456.789123456789"))
.go();
} finally {
-
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
- test("drop table if exists dfs.tmp.%s", tableName);
+
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ run("drop table if exists dfs.tmp.%s", tableName);
}
}
@@ -1116,9 +1117,9 @@ public class TestParquetWriter extends BaseTestQuery {
public void testWriteDecimalIntBigIntBinary() throws Exception {
String tableName = "decimalIntBigIntBinary";
try {
-
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
true);
- alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
BINARY.name());
- test(
+
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
true);
+
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
BINARY.name());
+ run(
"create table dfs.tmp.%s as\n" +
"select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
"cast('123456.789123456789' as decimal(18, 12)) as
decBigInt,\n" +
@@ -1138,9 +1139,9 @@ public class TestParquetWriter extends BaseTestQuery {
new BigDecimal("123456.789123456789"))
.go();
} finally {
-
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
- test("drop table if exists dfs.tmp.%s", tableName);
+
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ run("drop table if exists dfs.tmp.%s", tableName);
}
}
@@ -1148,9 +1149,9 @@ public class TestParquetWriter extends BaseTestQuery {
public void testWriteDecimalFixedLenOnly() throws Exception {
String tableName = "decimalFixedLenOnly";
try {
-
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
false);
- alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
FIXED_LEN_BYTE_ARRAY.name());
- test(
+
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
false);
+
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
FIXED_LEN_BYTE_ARRAY.name());
+ run(
"create table dfs.tmp.%s as\n" +
"select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
"cast('123456.789123456789' as decimal(18, 12)) as
decBigInt,\n" +
@@ -1170,9 +1171,9 @@ public class TestParquetWriter extends BaseTestQuery {
new BigDecimal("123456.789123456789"))
.go();
} finally {
-
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
- test("drop table if exists dfs.tmp.%s", tableName);
+
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ run("drop table if exists dfs.tmp.%s", tableName);
}
}
@@ -1180,9 +1181,9 @@ public class TestParquetWriter extends BaseTestQuery {
public void testWriteDecimalBinaryOnly() throws Exception {
String tableName = "decimalBinaryOnly";
try {
-
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
false);
- alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
BINARY.name());
- test(
+
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
false);
+
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
BINARY.name());
+ run(
"create table dfs.tmp.%s as\n" +
"select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
"cast('123456.789123456789' as decimal(18, 12)) as
decBigInt,\n" +
@@ -1202,9 +1203,9 @@ public class TestParquetWriter extends BaseTestQuery {
new BigDecimal("123456.789123456789"))
.go();
} finally {
-
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
- test("drop table if exists dfs.tmp.%s", tableName);
+
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ run("drop table if exists dfs.tmp.%s", tableName);
}
}
@@ -1228,9 +1229,9 @@ public class TestParquetWriter extends BaseTestQuery {
fixedLen.add(new BigDecimal("0.000000"));
try {
-
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
true);
- alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
FIXED_LEN_BYTE_ARRAY.name());
- test(
+
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
true);
+
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
FIXED_LEN_BYTE_ARRAY.name());
+ run(
"create table dfs.tmp.%s as\n" +
"select * from
cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName);
checkTableTypes(tableName,
@@ -1247,9 +1248,9 @@ public class TestParquetWriter extends BaseTestQuery {
.baselineValues(ints, longs, fixedLen, fixedLen)
.go();
} finally {
-
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
- test("drop table if exists dfs.tmp.%s", tableName);
+
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ run("drop table if exists dfs.tmp.%s", tableName);
}
}
@@ -1273,9 +1274,9 @@ public class TestParquetWriter extends BaseTestQuery {
fixedLen.add(new BigDecimal("0.000000"));
try {
-
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
false);
- alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
FIXED_LEN_BYTE_ARRAY.name());
- test(
+
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
false);
+
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
FIXED_LEN_BYTE_ARRAY.name());
+ run(
"create table dfs.tmp.%s as\n" +
"select * from
cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName);
checkTableTypes(tableName,
@@ -1292,9 +1293,9 @@ public class TestParquetWriter extends BaseTestQuery {
.baselineValues(ints, longs, fixedLen, fixedLen)
.go();
} finally {
-
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
- test("drop table if exists dfs.tmp.%s", tableName);
+
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ run("drop table if exists dfs.tmp.%s", tableName);
}
}
@@ -1317,9 +1318,9 @@ public class TestParquetWriter extends BaseTestQuery {
fixedLen.add(new BigDecimal("-999999999999.999999"));
fixedLen.add(new BigDecimal("0.000000"));
try {
-
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
false);
- alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
BINARY.name());
- test(
+
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
false);
+
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
BINARY.name());
+ run(
"create table dfs.tmp.%s as\n" +
"select * from
cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName);
checkTableTypes(tableName,
@@ -1336,9 +1337,9 @@ public class TestParquetWriter extends BaseTestQuery {
.baselineValues(ints, longs, fixedLen, fixedLen)
.go();
} finally {
-
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
- test("drop table if exists dfs.tmp.%s", tableName);
+
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ run("drop table if exists dfs.tmp.%s", tableName);
}
}
@@ -1346,7 +1347,7 @@ public class TestParquetWriter extends BaseTestQuery {
public void testCtasForList() throws Exception {
String tableName = "testCtasForList";
try {
- test("CREATE TABLE `%s`.`%s` AS SELECT l FROM
cp.`jsoninput/input2.json`", DFS_TMP_SCHEMA, tableName);
+ run("CREATE TABLE `%s`.`%s` AS SELECT l FROM
cp.`jsoninput/input2.json`", DFS_TMP_SCHEMA, tableName);
testBuilder()
.sqlQuery("SELECT * FROM `%s`.`/%s` LIMIT 1", DFS_TMP_SCHEMA,
tableName)
.unOrdered()
@@ -1354,7 +1355,7 @@ public class TestParquetWriter extends BaseTestQuery {
.baselineValues(asList(4L, 2L))
.go();
} finally {
- test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
+ run("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
}
}
@@ -1362,7 +1363,7 @@ public class TestParquetWriter extends BaseTestQuery {
public void testCtasForRepeatedList() throws Exception {
String tableName = "testCtasForRepeatedList";
try {
- test("CREATE TABLE `%s`.`%s` AS SELECT * FROM
cp.`jsoninput/repeated_list_bug.json`", DFS_TMP_SCHEMA, tableName);
+ run("CREATE TABLE `%s`.`%s` AS SELECT * FROM
cp.`jsoninput/repeated_list_bug.json`", DFS_TMP_SCHEMA, tableName);
testBuilder()
.sqlQuery("SELECT rl FROM `%s`.`/%s`", DFS_TMP_SCHEMA, tableName)
.unOrdered()
@@ -1371,7 +1372,7 @@ public class TestParquetWriter extends BaseTestQuery {
.baselineValues(asList(asList(9L, 7L), asList(4L, 8L)))
.go();
} finally {
- test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
+ run("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
}
}
@@ -1379,7 +1380,7 @@ public class TestParquetWriter extends BaseTestQuery {
public void testCtasForRepeatedListOfMaps() throws Exception {
String tableName = "testCtasForRepeatedListOfMaps";
try {
- test("CREATE TABLE `%s`.`%s` AS SELECT * FROM
cp.`jsoninput/repeated_list_of_maps.json`", DFS_TMP_SCHEMA, tableName);
+ run("CREATE TABLE `%s`.`%s` AS SELECT * FROM
cp.`jsoninput/repeated_list_of_maps.json`", DFS_TMP_SCHEMA, tableName);
testBuilder()
.sqlQuery("SELECT * FROM `%s`.`/%s`", DFS_TMP_SCHEMA, tableName)
.unOrdered()
@@ -1394,7 +1395,7 @@ public class TestParquetWriter extends BaseTestQuery {
))
.go();
} finally {
- test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
+ run("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
}
}
@@ -1402,8 +1403,8 @@ public class TestParquetWriter extends BaseTestQuery {
public void testCTASWithDictInSelect() throws Exception {
String tableName = "table_with_dict";
try {
- test("use dfs.tmp");
- test("create table %s as select id, mapcol from
cp.`store/parquet/complex/map/parquet/000000_0.parquet`", tableName);
+ run("use dfs.tmp");
+ run("create table %s as select id, mapcol from
cp.`store/parquet/complex/map/parquet/000000_0.parquet`", tableName);
testBuilder()
.sqlQuery("select * from %s", tableName)
.unOrdered()
@@ -1415,7 +1416,7 @@ public class TestParquetWriter extends BaseTestQuery {
.baselineValues(2, TestBuilder.mapOfObject("a", 1, "b", 2, "c", 3))
.go();
} finally {
- test("DROP TABLE IF EXISTS %s", tableName);
+ run("DROP TABLE IF EXISTS %s", tableName);
}
}
@@ -1423,8 +1424,8 @@ public class TestParquetWriter extends BaseTestQuery {
public void testCTASWithRepeatedDictInSelect() throws Exception {
String tableName = "table_with_dict_array";
try {
- test("use dfs.tmp");
- test("create table %s as select id, map_array from
cp.`store/parquet/complex/map/parquet/000000_0.parquet`", tableName);
+ run("use dfs.tmp");
+ run("create table %s as select id, map_array from
cp.`store/parquet/complex/map/parquet/000000_0.parquet`", tableName);
testBuilder()
.sqlQuery("select * from %s", tableName)
.unOrdered()
@@ -1471,7 +1472,44 @@ public class TestParquetWriter extends BaseTestQuery {
)
.go();
} finally {
- test("DROP TABLE IF EXISTS %s", tableName);
+ run("DROP TABLE IF EXISTS %s", tableName);
+ }
+ }
+
+ @Test
+ public void testFormatConfigOpts() throws Exception {
+ FileSystemConfig pluginConfig = (FileSystemConfig)
cluster.storageRegistry().copyConfig("dfs");
+ FormatPluginConfig backupConfig = pluginConfig.getFormats().get("parquet");
+
+ cluster.defineFormat("dfs", "parquet", new ParquetFormatConfig(
+ false,
+ true,
+ 123,
+ 456,
+ true,
+ "snappy",
+ "binary",
+ true,
+ "v2"
+ )
+ );
+
+ try {
+ String query = "select * from dfs.`parquet/int96_dict_change`";
+ queryBuilder()
+ .sql(query)
+ .jsonPlanMatcher()
+ .include("\"autoCorrectCorruptDates\" : false")
+ .include("\"enableStringsSignedMinMax\" : true")
+ .include("\"blockSize\" : 123")
+ .include("\"pageSize\" : 456")
+ .include("\"useSingleFSBlock\" : true")
+ .include("\"writerCompressionType\" : \"snappy\"")
+ .include("\"writerUsePrimitivesForDecimals\" : true")
+ .include("\"writerFormatVersion\" : \"v2\"")
+ .match();
+ } finally {
+ cluster.defineFormat("dfs", "parquet", backupConfig);
}
}
@@ -1493,15 +1531,26 @@ public class TestParquetWriter extends BaseTestQuery {
for (Pair<String, PrimitiveType.PrimitiveTypeName> nameType :
columnsToCheck) {
assertEquals(
- String.format("Table %s does not contain column %s with type %s",
- tableName, nameType.getKey(), nameType.getValue()),
nameType.getValue(),
-
schema.getType(nameType.getKey()).asPrimitiveType().getPrimitiveTypeName());
+
schema.getType(nameType.getKey()).asPrimitiveType().getPrimitiveTypeName(),
+ String.format(
+ "Table %s does not contain column %s with type %s",
+ tableName,
+ nameType.getKey(),
+ nameType.getValue()
+ )
+ );
assertEquals(
- String.format("Table %s %s column %s with DECIMAL type", tableName,
- isDecimalType ? "does not contain" : "contains unexpected",
nameType.getKey()),
- isDecimalType, schema.getType(nameType.getKey()).getOriginalType() ==
OriginalType.DECIMAL);
+ isDecimalType,
+ schema.getType(nameType.getKey()).getLogicalTypeAnnotation()
instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation,
+ String.format(
+ "Table %s %s column %s with DECIMAL type",
+ tableName,
+ isDecimalType ? "does not contain" : "contains unexpected",
+ nameType.getKey()
+ )
+ );
}
}
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index 2889b71..d9848b6 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -54,7 +54,14 @@ public class TestFormatPluginOptionExtractor extends
BaseTest {
assertEquals("(type: String, name: String)", d.presentParams());
break;
case "parquet":
- assertEquals(d.typeName, "(type: String, autoCorrectCorruptDates:
boolean, enableStringsSignedMinMax: boolean)", d.presentParams());
+ assertEquals(
+ d.typeName,
+ "(type: String, autoCorrectCorruptDates: boolean,
enableStringsSignedMinMax: boolean, " +
+ "blockSize: Integer, pageSize: Integer, useSingleFSBlock:
Boolean, writerCompressionType: String, " +
+ "writerLogicalTypeForDecimals: String,
writerUsePrimitivesForDecimals: Boolean, " +
+ "writerFormatVersion: String)",
+ d.presentParams()
+ );
break;
case "json":
assertEquals(d.typeName, "(type: String)", d.presentParams());
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
index 6b6341a..f44a56a 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
@@ -22,6 +22,7 @@ import org.apache.drill.categories.ParquetTest;
import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.test.BaseTest;
import org.apache.hadoop.conf.Configuration;
@@ -97,28 +98,32 @@ public class TestParquetReaderConfig extends BaseTest {
@Test
public void testPriorityAssignmentForStringsSignedMinMax() throws Exception {
@SuppressWarnings("resource")
- SystemOptionManager options = new
SystemOptionManager(DrillConfig.create()).init();
+ SystemOptionManager sysOpts = new
SystemOptionManager(DrillConfig.create()).init();
+ SessionOptionManager sessOpts = new SessionOptionManager(sysOpts, null);
// use value from format config
ParquetFormatConfig formatConfig = new ParquetFormatConfig();
ParquetReaderConfig readerConfig =
ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
- assertEquals(formatConfig.isStringsSignedMinMaxEnabled(),
readerConfig.enableStringsSignedMinMax());
+ assertEquals(formatConfig.isEnableStringsSignedMinMax(),
readerConfig.enableStringsSignedMinMax());
// change format config value
- formatConfig = new ParquetFormatConfig(true, true);
+ formatConfig = ParquetFormatConfig.builder()
+ .autoCorrectCorruptDates(true)
+ .enableStringsSignedMinMax(true)
+ .build();
+
readerConfig =
ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
- assertEquals(formatConfig.isStringsSignedMinMaxEnabled(),
readerConfig.enableStringsSignedMinMax());
+ assertEquals(formatConfig.isEnableStringsSignedMinMax(),
readerConfig.enableStringsSignedMinMax());
- // set option, option value should have higher priority
-
options.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX,
"false");
+ // set system option, option value should not have higher priority
+
sysOpts.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX,
"false");
+ readerConfig =
ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(sysOpts).build();
+ assertTrue(readerConfig.enableStringsSignedMinMax());
- readerConfig =
ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(options).build();
+ // set session option, option value should have higher priority
+
sessOpts.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX,
"false");
+ readerConfig =
ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(sessOpts).build();
assertFalse(readerConfig.enableStringsSignedMinMax());
-
- // set option as empty (undefined), config should have higher priority
-
options.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "");
- readerConfig =
ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(options).build();
- assertEquals(formatConfig.isStringsSignedMinMaxEnabled(),
readerConfig.enableStringsSignedMinMax());
}
@@ -128,4 +133,4 @@ public class TestParquetReaderConfig extends BaseTest {
assertEquals(expectedValue, actualValue);
}
-}
\ No newline at end of file
+}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
index 1afd3b3..8cae668 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
@@ -38,6 +38,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.math.BigDecimal;
+import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
@@ -112,7 +113,9 @@ public class TestVarlenDecimal extends ClusterTest {
"select cast('%s' as decimal(36, 9)) dec36", tableName,
bigDecimalValue);
String json = FileUtils.readFileToString(
- Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), tableName,
"0_0_0.json").toFile());
+ Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), tableName,
"0_0_0.json").toFile(),
+ Charset.defaultCharset()
+ );
Assert.assertThat(json, CoreMatchers.containsString(bigDecimalValue));
@@ -142,7 +145,9 @@ public class TestVarlenDecimal extends ClusterTest {
"select cast('%s' as decimal(36, 9)) dec36", tableName,
bigDecimalValue);
String csv = FileUtils.readFileToString(
- Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), tableName,
"0_0_0.csvh").toFile());
+ Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), tableName,
"0_0_0.csvh").toFile(),
+ Charset.defaultCharset()
+ );
Assert.assertThat(csv, CoreMatchers.containsString(bigDecimalValue));