This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 17f3654  DRILL-1282: Add read and write support for Parquet v2 (#2351)
17f3654 is described below

commit 17f3654919b8429b12e321fb4ee837b7a52e06f1
Author: James Turton <[email protected]>
AuthorDate: Tue Nov 23 17:30:36 2021 +0200

    DRILL-1282: Add read and write support for Parquet v2 (#2351)
    
    * Add an option for Parquet v2 writing.
    
    * Finish adding Parquet v2 write support.
    
    * Fix LGTM error inadvertently merged in #2366.
    
    * Add parquet format config options for version and various other 
parameters.
    
    * Fix persistence and application of config opts.
    
    * Fix TestFormatPluginOptionExtractor failure.
    
    * Add a test of parquet format config writer options.
    
    * Fix checkstyle violations.
    
    * Address  review comments.
    
    * Upgrade TestParquetWriter to ClusterTest to make a home for parquet 
format config test.
    
    * Change Parquet version strings to "v1", "v2".
    
    * Convert comment containing an @link to Javadoc.
---
 .../org/apache/drill/categories/ParquetTest.java   |   4 +
 .../java/org/apache/drill/exec/ExecConstants.java  |  11 +
 .../drill/exec/server/options/OptionValue.java     |  10 +
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../drill/exec/server/rest/StorageResources.java   |   2 +-
 .../exec/store/parquet/ParquetFormatConfig.java    | 129 +++++---
 .../exec/store/parquet/ParquetFormatPlugin.java    | 103 ++++--
 .../drill/exec/store/parquet/ParquetGroupScan.java |   4 +-
 .../exec/store/parquet/ParquetReaderConfig.java    |  15 +-
 .../exec/store/parquet/ParquetRecordWriter.java    |  21 +-
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../physical/impl/writer/TestParquetWriter.java    | 355 ++++++++++++---------
 .../store/dfs/TestFormatPluginOptionExtractor.java |   9 +-
 .../store/parquet/TestParquetReaderConfig.java     |  31 +-
 .../exec/store/parquet/TestVarlenDecimal.java      |   9 +-
 15 files changed, 452 insertions(+), 253 deletions(-)

diff --git a/common/src/test/java/org/apache/drill/categories/ParquetTest.java 
b/common/src/test/java/org/apache/drill/categories/ParquetTest.java
index aafaeb4..b7d7668 100644
--- a/common/src/test/java/org/apache/drill/categories/ParquetTest.java
+++ b/common/src/test/java/org/apache/drill/categories/ParquetTest.java
@@ -21,4 +21,8 @@ package org.apache.drill.categories;
  * This is a category used to mark unit tests that test Drill Parquet support.
  */
 public interface ParquetTest {
+  /**
+   * tag for JUnit5
+   */
+  String TAG = "parquet-test";
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 16d21b5..ef5cd3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec;
 
+import java.util.Arrays;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.rpc.user.InboundImpersonationManager;
@@ -36,6 +37,7 @@ import 
org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidat
 import 
org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator;
 import org.apache.drill.exec.server.options.TypeValidators.RangeLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.StringValidator;
+import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
 import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -375,6 +377,15 @@ public final class ExecConstants {
   public static final OptionValidator 
PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS_VALIDATOR = new 
EnumeratedStringValidator(PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
       new OptionDescription("Parquet writer logical type for decimal; 
supported types \'fixed_len_byte_array\' and \'binary\'"),
       "fixed_len_byte_array", "binary");
+  public static final String PARQUET_WRITER_FORMAT_VERSION = 
"store.parquet.writer.format_version";
+  public static final OptionValidator PARQUET_WRITER_FORMAT_VERSION_VALIDATOR 
= new EnumeratedStringValidator(
+    PARQUET_WRITER_FORMAT_VERSION,
+    new OptionDescription(
+      "Parquet format version used for storing Parquet output.  Allowed 
values:" +
+        Arrays.toString(ParquetFormatPlugin.PARQUET_FORMAT_VERSIONS)
+    ),
+    ParquetFormatPlugin.PARQUET_FORMAT_VERSIONS
+  );
 
   // TODO - The below two options don't seem to be used in the Drill code base
   @Deprecated // TODO: DRILL-6527
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
index e74f46a..b00cfce 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -240,6 +240,16 @@ public class OptionValue implements 
Comparable<OptionValue> {
     }
   }
 
+  /**
+   * Gets the value of this option if it exists at a scope at least as narrow 
as the given scope.
+   * @param minScope scope which the option's scope should be narrower than
+   * @return null if the option does not exist at a scope at least as narrow 
as minScope
+   */
+  @JsonIgnore
+  public Object getValueMinScope(OptionScope minScope) {
+    return scope.compareTo(minScope) >= 0 ? getValue() : null;
+  }
+
   @JsonIgnore
   public OptionScope getScope() {
     return scope;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index fa3c3f5..0122fe3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -172,6 +172,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       new 
OptionDefinition(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS_VALIDATOR),
+      new 
OptionDefinition(ExecConstants.PARQUET_WRITER_FORMAT_VERSION_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR),
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index a6c958c..dcba516 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -271,7 +271,7 @@ public class StorageResources {
     } catch (PluginException e) {
       logger.error("Error while saving plugin", e);
       return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
-        .entity(message("Error while saving plugin: ", e.getMessage()))
+        .entity(message("Error while saving plugin: %s", e.getMessage()))
         .build();
     }
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index 4370b92..4367dc7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -18,81 +18,110 @@
 package org.apache.drill.exec.store.parquet;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.util.Objects;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
 
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
+@EqualsAndHashCode
 @JsonTypeName("parquet") @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class ParquetFormatConfig implements FormatPluginConfig {
 
-  private final boolean autoCorrectCorruptDates;
-  private final boolean enableStringsSignedMinMax;
-
-  public ParquetFormatConfig() {
-    this(true, false);
-  }
-
-  @JsonCreator
-  public ParquetFormatConfig(@JsonProperty("autoCorrectCorruptDates") Boolean 
autoCorrectCorruptDates,
-      @JsonProperty("enableStringsSignedMinMax") boolean 
enableStringsSignedMinMax) {
-    this.autoCorrectCorruptDates = autoCorrectCorruptDates == null ? true : 
autoCorrectCorruptDates;
-    this.enableStringsSignedMinMax = enableStringsSignedMinMax;
-  }
-
   /**
-   * @return true if auto correction of corrupt dates is enabled, false 
otherwise
+   * Until DRILL-4203 was resolved, Drill could write non-standard dates into
+   * parquet files. This issue is related to all drill releases where {@link
+   * 
org.apache.drill.exec.store.parquet.ParquetRecordWriter#WRITER_VERSION_PROPERTY}
+   * < {@link 
org.apache.drill.exec.store.parquet.ParquetReaderUtility#DRILL_WRITER_VERSION_STD_DATE_FORMAT}.
+
+   * The values have been read correctly by Drill, but external tools like
+   * Spark reading the files will see corrupted values for all dates that
+   * have been written by Drill.  To maintain compatibility with old files,
+   * the parquet reader code has been given the ability to check for the
+   * old format and automatically shift the corrupted values into corrected
+   * ones automatically.
    */
-  @JsonIgnore
-  public boolean areCorruptDatesAutoCorrected() {
-    return autoCorrectCorruptDates;
-  }
+ @Getter private final boolean autoCorrectCorruptDates;
 
   /**
-   * Parquet statistics for UTF-8 data for files created prior to 1.9.1 
parquet library version was stored incorrectly.
-   * If user exactly knows that data in binary columns is in ASCII (not 
UTF-8), turning this property to 'true'
-   * enables statistics usage for varchar and decimal columns.
-   *
-   * Can be overridden for individual tables using
-   * @link 
org.apache.drill.exec.ExecConstants#PARQUET_READER_STRINGS_SIGNED_MIN_MAX} 
session option.
+   * Parquet statistics for UTF-8 data in files created prior to 1.9.1 parquet
+   * library version were stored incorrectly.  If the user exactly knows that
+   * data in binary columns is in ASCII (not UTF-8), turning this property to
+   * 'true' enables statistics usage for varchar and decimal columns.
    *
-   * @return true if string signed min max enabled, false otherwise
+   * {@link 
org.apache.drill.exec.ExecConstants#PARQUET_READER_STRINGS_SIGNED_MIN_MAX}
    */
-  @JsonIgnore
-  public boolean isStringsSignedMinMaxEnabled() {
-    return enableStringsSignedMinMax;
-  }
+  @Getter private final boolean enableStringsSignedMinMax;
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    ParquetFormatConfig that = (ParquetFormatConfig) o;
-    return Objects.equals(autoCorrectCorruptDates, 
that.autoCorrectCorruptDates) &&
-           Objects.equals(enableStringsSignedMinMax, 
that.enableStringsSignedMinMax);
+  // {@link org.apache.drill.exec.ExecConstants#PARQUET_BLOCK_SIZE}
+  @Getter private final Integer blockSize;
+
+  // {@link org.apache.drill.exec.ExecConstants#PARQUET_PAGE_SIZE}
+  @Getter private final Integer pageSize;
+
+  // {@link 
org.apache.drill.exec.ExecConstants#PARQUET_WRITER_USE_SINGLE_FS_BLOCK}
+  @Getter private final Boolean useSingleFSBlock;
+
+  // {@link 
org.apache.drill.exec.ExecConstants#PARQUET_WRITER_COMPRESSION_TYPE}
+  @Getter private final String writerCompressionType;
+
+  // {@link 
org.apache.drill.exec.ExecConstants#PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS}
+  @Getter private final String writerLogicalTypeForDecimals;
+
+  // {@link 
org.apache.drill.exec.ExecConstants#PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS}
+  @Getter private final Boolean writerUsePrimitivesForDecimals;
+
+  // {@link org.apache.drill.exec.ExecConstants#PARQUET_WRITER_FORMAT_VERSION}
+  @Getter private final String writerFormatVersion;
+
+  public ParquetFormatConfig() {
+    // config opts which are also system opts must default to null so as not
+    // to override system opts.
+    this(true, false, null, null, null, null, null, null, null);
   }
 
-  @Override
-  public int hashCode() {
-    return Objects.hash(autoCorrectCorruptDates, enableStringsSignedMinMax);
+  @JsonCreator
+  @Builder
+  public ParquetFormatConfig(
+    @JsonProperty("autoCorrectCorruptDates") Boolean autoCorrectCorruptDates,
+    @JsonProperty("enableStringsSignedMinMax") boolean 
enableStringsSignedMinMax,
+    @JsonProperty("blockSize") Integer blockSize,
+    @JsonProperty("pageSize") Integer pageSize,
+    @JsonProperty("useSingleFSBlock") Boolean useSingleFSBlock,
+    @JsonProperty("writerCompressionType") String writerCompressionType,
+    @JsonProperty("writerLogicalTypeForDecimals") String 
writerLogicalTypeForDecimals,
+    @JsonProperty("writerUsePrimitivesForDecimals") Boolean 
writerUsePrimitivesForDecimals,
+    @JsonProperty("writerFormatVersion") String writerFormatVersion
+  ) {
+    this.autoCorrectCorruptDates = autoCorrectCorruptDates == null ? true : 
autoCorrectCorruptDates;
+    this.enableStringsSignedMinMax = enableStringsSignedMinMax;
+    this.blockSize = blockSize;
+    this.pageSize = pageSize;
+    this.useSingleFSBlock = useSingleFSBlock;
+    this.writerCompressionType = writerCompressionType;
+    this.writerLogicalTypeForDecimals = writerLogicalTypeForDecimals;
+    this.writerUsePrimitivesForDecimals = writerUsePrimitivesForDecimals;
+    this.writerFormatVersion = writerFormatVersion;
   }
 
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
-        .field("autoCorrectCorruptDates", autoCorrectCorruptDates)
-        .field("enableStringsSignedMinMax", enableStringsSignedMinMax)
-        .toString();
+      .field("autoCorrectCorruptDates", autoCorrectCorruptDates)
+      .field("enableStringsSignedMinMax", enableStringsSignedMinMax)
+      .field("blockSize", blockSize)
+      .field("pageSize", pageSize)
+      .field("useSingleFSBlock", useSingleFSBlock)
+      .field("writerCompressionType", writerCompressionType)
+      .field("writerLogicalTypeForDecimals", writerLogicalTypeForDecimals)
+      .field("writerUsePrimitivesForDecimals", writerUsePrimitivesForDecimals)
+      .field("writerFormatVersion", writerFormatVersion)
+      .toString();
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 9129fc8..b1a614f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -49,6 +51,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -78,6 +81,9 @@ import org.slf4j.LoggerFactory;
 
 public class ParquetFormatPlugin implements FormatPlugin {
 
+  /** {@link org.apache.parquet.column.ParquetProperties.WriterVersion} */
+  public static final String[] PARQUET_FORMAT_VERSIONS = { "v1", "v2" };
+
   private static final Logger logger = 
LoggerFactory.getLogger(ParquetFormatPlugin.class);
 
   public static final ParquetMetadataConverter parquetMetadataConverter = new 
ParquetMetadataConverter();
@@ -141,34 +147,87 @@ public class ParquetFormatPlugin implements FormatPlugin {
   }
 
   public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter 
writer) throws IOException, OutOfMemoryException {
-    Map<String, String> options = new HashMap<>();
+    Map<String, String> writerOpts = new HashMap<>();
+    OptionManager contextOpts = context.getOptions();
 
-    options.put("location", writer.getLocation());
+    writerOpts.put("location", writer.getLocation());
 
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), 
handle.getMinorFragmentId());
-    options.put("prefix", fragmentId);
-
-    options.put(ExecConstants.PARQUET_BLOCK_SIZE, 
context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
-    options.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK,
-      
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).bool_val.toString());
-    options.put(ExecConstants.PARQUET_PAGE_SIZE, 
context.getOptions().getOption(ExecConstants.PARQUET_PAGE_SIZE).num_val.toString());
-    options.put(ExecConstants.PARQUET_DICT_PAGE_SIZE, 
context.getOptions().getOption(ExecConstants.PARQUET_DICT_PAGE_SIZE).num_val.toString());
-
-    options.put(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
-        
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).string_val);
-
-    options.put(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
-        
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING).bool_val.toString());
-
-    options.put(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
-        
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS).string_val);
-
-    options.put(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
-        
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS).bool_val.toString());
+    writerOpts.put("prefix", fragmentId);
+
+    // Many options which follow may be set as Drill config options or in the 
parquet format
+    // plugin config.  If there is a Drill option set at session scope or 
narrower it takes precendence.
+    OptionValue.OptionScope minScope = OptionValue.OptionScope.SESSION;
+
+    writerOpts.put(ExecConstants.PARQUET_BLOCK_SIZE,
+      ObjectUtils.firstNonNull(
+        
contextOpts.getOption(ExecConstants.PARQUET_BLOCK_SIZE).getValueMinScope(minScope),
+        config.getBlockSize(),
+        contextOpts.getInt(ExecConstants.PARQUET_BLOCK_SIZE)
+      ).toString()
+    );
+
+    writerOpts.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK,
+      ObjectUtils.firstNonNull(
+        
contextOpts.getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).getValueMinScope(minScope),
+        config.getUseSingleFSBlock(),
+        
contextOpts.getBoolean(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK)
+      ).toString()
+    );
+
+    writerOpts.put(ExecConstants.PARQUET_PAGE_SIZE,
+      ObjectUtils.firstNonNull(
+        
contextOpts.getOption(ExecConstants.PARQUET_PAGE_SIZE).getValueMinScope(minScope),
+        config.getPageSize(),
+        contextOpts.getInt(ExecConstants.PARQUET_PAGE_SIZE)
+      ).toString()
+    );
+
+    // "internal use" so not settable in format config
+    writerOpts.put(ExecConstants.PARQUET_DICT_PAGE_SIZE,
+      
contextOpts.getOption(ExecConstants.PARQUET_DICT_PAGE_SIZE).num_val.toString()
+    );
+
+    // "internal use" so not settable in format config
+    writerOpts.put(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
+      
contextOpts.getOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING).bool_val.toString()
+    );
+
+    writerOpts.put(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
+      ObjectUtils.firstNonNull(
+        
contextOpts.getOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).getValueMinScope(minScope),
+        config.getWriterCompressionType(),
+        contextOpts.getString(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE)
+      ).toString()
+    );
+
+    writerOpts.put(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS,
+      ObjectUtils.firstNonNull(
+        
contextOpts.getOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS).getValueMinScope(minScope),
+        config.getWriterLogicalTypeForDecimals(),
+        
contextOpts.getString(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS)
+      ).toString()
+    );
+
+    
writerOpts.put(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
+      ObjectUtils.firstNonNull(
+        
contextOpts.getOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS).getValueMinScope(minScope),
+        config.getWriterUsePrimitivesForDecimals(),
+        
contextOpts.getBoolean(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS)
+      ).toString()
+    );
+
+    writerOpts.put(ExecConstants.PARQUET_WRITER_FORMAT_VERSION,
+      ObjectUtils.firstNonNull(
+        
contextOpts.getOption(ExecConstants.PARQUET_WRITER_FORMAT_VERSION).getValueMinScope(minScope),
+        config.getWriterFormatVersion(),
+        contextOpts.getString(ExecConstants.PARQUET_WRITER_FORMAT_VERSION)
+      ).toString()
+    );
 
     RecordWriter recordWriter = new ParquetRecordWriter(context, writer);
-    recordWriter.init(options);
+    recordWriter.init(writerOpts);
 
     return recordWriter;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 4d5e178..1af28d3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -99,7 +99,7 @@ public class ParquetGroupScan extends 
AbstractParquetGroupScan {
         .withCacheFileRoot(cacheFileRoot)
         .withReaderConfig(readerConfig)
         .withFileSystem(fs)
-        
.withCorrectCorruptedDates(this.formatConfig.areCorruptDatesAutoCorrected())
+        
.withCorrectCorruptedDates(this.formatConfig.isAutoCorrectCorruptDates())
         .withSchema(schema)
         .build();
 
@@ -146,7 +146,7 @@ public class ParquetGroupScan extends 
AbstractParquetGroupScan {
         .withSelection(selection)
         .withReaderConfig(readerConfig)
         .withFileSystem(fs)
-        .withCorrectCorruptedDates(formatConfig.areCorruptDatesAutoCorrected())
+        .withCorrectCorruptedDates(formatConfig.isAutoCorrectCorruptDates())
         .build();
 
     this.usedMetadataCache = metadataProvider.isUsedMetadataCache();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
index 3b668c7..c7a3db5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ParquetReadOptions;
 
@@ -175,8 +176,8 @@ public class ParquetReaderConfig {
 
       // first assign configuration values from format config
       if (formatConfig != null) {
-        readerConfig.autoCorrectCorruptedDates = 
formatConfig.areCorruptDatesAutoCorrected();
-        readerConfig.enableStringsSignedMinMax = 
formatConfig.isStringsSignedMinMaxEnabled();
+        readerConfig.autoCorrectCorruptedDates = 
formatConfig.isAutoCorrectCorruptDates();
+        readerConfig.enableStringsSignedMinMax = 
formatConfig.isEnableStringsSignedMinMax();
       }
 
       // then assign configuration values from Hadoop configuration
@@ -186,11 +187,13 @@ public class ParquetReaderConfig {
         readerConfig.enableTimeReadCounter = 
conf.getBoolean(ENABLE_TIME_READ_COUNTER, readerConfig.enableTimeReadCounter);
       }
 
-      // last assign values from session options, session options have higher 
priority than other configurations
+      // last assign values from session or query scoped options which have 
higher priority than other configurations
       if (options != null) {
-        String option = 
options.getOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR);
-        if (!option.isEmpty()) {
-          readerConfig.enableStringsSignedMinMax = Boolean.valueOf(option);
+        String optVal  = (String) options.getOption(
+          ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX
+        ).getValueMinScope(OptionValue.OptionScope.SESSION);
+        if (optVal != null && !optVal.isEmpty()) {
+          readerConfig.enableStringsSignedMinMax = Boolean.valueOf(optVal);
         }
       }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 53a228f..a4b68da 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -62,7 +62,10 @@ import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.impl.ColumnWriteStoreV2;
 import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
+import org.apache.parquet.column.values.factory.DefaultV2ValuesWriterFactory;
+import org.apache.parquet.column.values.factory.ValuesWriterFactory;
 import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
 import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -206,6 +209,9 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
     enableDictionary = 
Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
     useSingleFSBlock = 
Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK));
     usePrimitiveTypesForDecimals = 
Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS));
+    writerVersion = WriterVersion.fromString(
+      writerOptions.get(ExecConstants.PARQUET_WRITER_FORMAT_VERSION)
+    );
 
     if (useSingleFSBlock) {
       // Round up blockSize to multiple of 64K.
@@ -263,20 +269,29 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
     // We don't want this number to be too small either. Ideally, slightly 
bigger than the page size,
     // but not bigger than the block buffer
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + 
pageSize / 10, initialBlockBufferSize));
+    ValuesWriterFactory valWriterFactory = writerVersion == 
WriterVersion.PARQUET_1_0
+      ? new DefaultV1ValuesWriterFactory()
+      : new DefaultV2ValuesWriterFactory();
+
     ParquetProperties parquetProperties = ParquetProperties.builder()
         .withPageSize(pageSize)
         .withDictionaryEncoding(enableDictionary)
         .withDictionaryPageSize(initialPageBufferSize)
-        .withWriterVersion(writerVersion)
         .withAllocator(new ParquetDirectByteBufferAllocator(oContext))
-        .withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
+        .withValuesWriterFactory(valWriterFactory)
+        .withWriterVersion(writerVersion)
         .build();
+
     // TODO: Replace ParquetColumnChunkPageWriteStore with 
ColumnChunkPageWriteStore from parquet library
     //   once DRILL-7906 (PARQUET-1006) will be resolved
     pageStore = new 
ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema,
             parquetProperties.getInitialSlabSize(), pageSize, 
parquetProperties.getAllocator(),
             parquetProperties.getColumnIndexTruncateLength(), 
parquetProperties.getPageWriteChecksumEnabled());
-    store = new ColumnWriteStoreV1(pageStore, parquetProperties);
+
+    store = writerVersion == WriterVersion.PARQUET_1_0
+      ? new ColumnWriteStoreV1(schema, pageStore, parquetProperties)
+      : new ColumnWriteStoreV2(schema, pageStore, parquetProperties);
+
     MessageColumnIO columnIO = new 
ColumnIOFactory(false).getColumnIO(this.schema);
     consumer = columnIO.getRecordWriter(store);
     setUp(schema, consumer);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index a8b1ed3..9160857 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -726,6 +726,7 @@ drill.exec.options: {
     store.parquet.writer.use_primitive_types_for_decimals: true,
     store.parquet.writer.logical_type_for_decimals: "fixed_len_byte_array",
     store.parquet.writer.use_single_fs_block: false,
+    store.parquet.writer.format_version: "v1",
     store.parquet.flat.reader.bulk: true,
     store.parquet.flat.batch.num_records: 32767,
     store.parquet.complex.batch.num_records: 4000,
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 093ddd8..cc6c2d1 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -21,14 +21,18 @@ import org.apache.calcite.util.Pair;
 import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.fn.interp.TestConstantFolding;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.TestBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -37,9 +41,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 import org.joda.time.Period;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -74,11 +79,11 @@ import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @RunWith(Parameterized.class)
 @Category({SlowTest.class, ParquetTest.class})
-public class TestParquetWriter extends BaseTestQuery {
+public class TestParquetWriter extends ClusterTest {
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
     return Arrays.asList(new Object[][] { {100} });
@@ -89,8 +94,6 @@ public class TestParquetWriter extends BaseTestQuery {
     dirTestWatcher.copyResourceToRoot(Paths.get("parquet", 
"int96_dict_change"));
   }
 
-  private static FileSystem fs;
-
   // Map storing a convenient name as well as the cast type necessary
   // to produce it casting from a varchar
   private static final Map<String, String> allTypes = new HashMap<>();
@@ -137,14 +140,14 @@ public class TestParquetWriter extends BaseTestQuery {
   public int repeat = 1;
 
   @BeforeClass
-  public static void initFs() throws Exception {
-    fs = getLocalFileSystem();
-    alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
+  public static void setUp() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+    client.alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
   }
 
   @AfterClass
   public static void disableDecimalDataType() {
-    resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+    client.resetSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
   }
 
   @Test
@@ -187,8 +190,7 @@ public class TestParquetWriter extends BaseTestQuery {
     new TestConstantFolding.SmallFileCreator(pathDir)
       .setRecord(sb.toString()).createFiles(1, 1, "json");
 
-    test("use dfs.tmp");
-    test("create table WIDE_PARQUET_TABLE_TestParquetWriter_testLargeFooter as 
select * from dfs.`%s/smallfile/smallfile.json`", path);
+    run("create table 
dfs.tmp.WIDE_PARQUET_TABLE_TestParquetWriter_testLargeFooter as select * from 
dfs.`%s/smallfile/smallfile.json`", path);
     testBuilder()
         .sqlQuery("select * from 
dfs.tmp.WIDE_PARQUET_TABLE_TestParquetWriter_testLargeFooter")
         .unOrdered()
@@ -204,26 +206,26 @@ public class TestParquetWriter extends BaseTestQuery {
 
     try {
       // read all of the types with the complex reader
-      alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+      client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
       runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+      client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
     }
   }
 
   @Test
   public void testAllScalarTypesDictionary() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, 
true);
+      
client.alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, 
true);
       /// read once with the flat reader
       runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json");
 
       // read all of the types with the complex reader
-      alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+      client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
       runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
+      client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
     }
   }
 
@@ -238,10 +240,10 @@ public class TestParquetWriter extends BaseTestQuery {
     String selection = "type";
     String inputTable = "cp.`donuts.json`";
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, 
true);
+      
client.alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, 
true);
       runTestAndValidate(selection, selection, inputTable, "donuts_json");
     } finally {
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
     }
   }
 
@@ -279,7 +281,7 @@ public class TestParquetWriter extends BaseTestQuery {
   @Test
   public void testTPCHReadWrite1_date_convertedType() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, 
false);
+      
client.alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, 
false);
       String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, 
L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
         "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as 
L_COMMITDATE, cast(L_RECEIPTDATE as DATE) AS L_RECEIPTDATE, L_SHIPINSTRUCT, 
L_SHIPMODE, L_COMMENT";
       String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, 
L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
@@ -287,7 +289,7 @@ public class TestParquetWriter extends BaseTestQuery {
       String inputTable = "cp.`tpch/lineitem.parquet`";
       runTestAndValidate(selection, validationSelection, inputTable, 
"lineitem_parquet_converted");
     } finally {
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
     }
   }
 
@@ -336,24 +338,24 @@ public class TestParquetWriter extends BaseTestQuery {
   @Test
   public void testTPCHReadWriteNoDictUncompressed() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, 
false);
-      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "none");
+      
client.alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, 
false);
+      client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, 
"none");
       String inputTable = "cp.`tpch/supplier.parquet`";
       runTestAndValidate("*", "*", inputTable, 
"supplier_parquet_no_dict_uncompressed");
     } finally {
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
-      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
+      client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
     }
   }
 
   @Test
   public void testTPCHReadWriteDictGzip() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "gzip");
+      client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, 
"gzip");
       String inputTable = "cp.`tpch/supplier.parquet`";
       runTestAndValidate("*", "*", inputTable, "supplier_parquet_dict_gzip");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+      client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
     }
   }
 
@@ -409,24 +411,24 @@ public class TestParquetWriter extends BaseTestQuery {
     // the old and new readers.
 
     try {
-      alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+      client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
       runTestAndValidate(selection, validateSelection, inputTable, 
"parquet_decimal");
-      alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
+      client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
       runTestAndValidate(selection, validateSelection, inputTable, 
"parquet_decimal");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+      client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
     }
   }
 
   @Test
   public void testMulipleRowGroups() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_BLOCK_SIZE, 1024*1024);
+      client.alterSession(ExecConstants.PARQUET_BLOCK_SIZE, 1024*1024);
       String selection = "mi";
       String inputTable = "cp.`customer.json`";
       runTestAndValidate(selection, selection, inputTable, 
"foodmart_customer_parquet");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_BLOCK_SIZE);
+      client.resetSession(ExecConstants.PARQUET_BLOCK_SIZE);
     }
   }
 
@@ -462,15 +464,15 @@ public class TestParquetWriter extends BaseTestQuery {
     String queryFromWriteOut = "select * from " + outputFile;
 
     try {
-      test("use dfs.tmp");
-      test(ctasStmt);
+      run("use dfs.tmp");
+      run(ctasStmt);
       testBuilder()
           .ordered()
           .sqlQuery(queryFromWriteOut)
           .sqlBaselineQuery(query)
           .build().run();
     } finally {
-      deleteTableIfExists(outputFile);
+      run("drop table if exists dfs.tmp.%s", outputFile);
     }
   }
 
@@ -486,7 +488,7 @@ public class TestParquetWriter extends BaseTestQuery {
         .optionSettingQueriesForBaseline("alter system set 
`store.parquet.use_new_reader` = true")
         .build().run();
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+      client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
     }
   }
 
@@ -505,7 +507,7 @@ public class TestParquetWriter extends BaseTestQuery {
             "alter system set `store.parquet.use_new_reader` = true")
         .build().run();
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+      client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
     }
   }
 
@@ -595,8 +597,7 @@ public class TestParquetWriter extends BaseTestQuery {
     String outputTable = "decimal_test";
 
     try {
-      test("use dfs.tmp; " +
-        "create table %s as select " +
+      run("create table dfs.tmp.%s as select " +
         "cast('1.2' as decimal(38, 2)) col1, cast('1.2' as decimal(28, 2)) 
col2 " +
         "from cp.`employee.json` limit 1", outputTable);
 
@@ -604,12 +605,12 @@ public class TestParquetWriter extends BaseTestQuery {
 
       testBuilder()
           .unOrdered()
-          .sqlQuery("select col1, col2 from %s ", outputTable)
+          .sqlQuery("select col1, col2 from dfs.tmp.%s ", outputTable)
           .baselineColumns("col1", "col2")
           .baselineValues(result, result)
           .go();
     } finally {
-      deleteTableIfExists(outputTable);
+      run("drop table if exists dfs.tmp.%s", outputTable);
     }
   }
 
@@ -619,7 +620,7 @@ public class TestParquetWriter extends BaseTestQuery {
     final String newTblName = "testTableOutputSchema";
 
     try {
-      test("CREATE TABLE dfs.tmp.%s(id, name, bday) AS SELECT " +
+      run("CREATE TABLE dfs.tmp.%s(id, name, bday) AS SELECT " +
         "cast(`employee_id` as integer), " +
         "cast(`full_name` as varchar(100)), " +
         "cast(`birth_date` as date) " +
@@ -632,7 +633,7 @@ public class TestParquetWriter extends BaseTestQuery {
           .baselineValues(1, "Sheri Nowmer", LocalDate.parse("1961-08-26"))
           .go();
     } finally {
-      deleteTableIfExists(newTblName);
+      run("drop table if exists dfs.tmp.%s", newTblName);
     }
   }
 
@@ -642,11 +643,11 @@ public class TestParquetWriter extends BaseTestQuery {
  */
   @Test
   public void testCTASWithIntervalTypes() throws Exception { // TODO: 
investigate NPE errors during the test execution
-    test("use dfs.tmp");
+    run("use dfs.tmp");
 
     String tableName = "drill_1980_t1";
     // test required interval day type
-    test("create table %s as " +
+    run("create table %s as " +
         "select " +
         "interval '10 20:30:40.123' day to second col1, " +
         "interval '-1000000000 20:12:23.999' day(10) to second col2 " +
@@ -659,7 +660,7 @@ public class TestParquetWriter extends BaseTestQuery {
     tableName = "drill_1980_2";
 
     // test required interval year type
-    test("create table %s as " +
+    run("create table %s as " +
         "select " +
         "interval '10-2' year to month col1, " +
         "interval '-100-8' year(3) to month col2 " +
@@ -671,7 +672,7 @@ public class TestParquetWriter extends BaseTestQuery {
     testParquetReaderHelper(tableName, row1Col1, row1Col2, row1Col1, row1Col2);
     // test nullable interval year type
     tableName = "drill_1980_t3";
-    test("create table %s as " +
+    run("create table %s as " +
         "select " +
         "cast (intervalyear_col as interval year) col1," +
         "cast(intervalyear_col as interval year) + interval '2' year col2 " +
@@ -686,7 +687,7 @@ public class TestParquetWriter extends BaseTestQuery {
 
     // test nullable interval day type
     tableName = "drill_1980_t4";
-    test("create table %s as " +
+    run("create table %s as " +
         "select " +
         "cast(intervalday_col as interval day) col1, " +
         "cast(intervalday_col as interval day) + interval '1' day col2 " +
@@ -727,25 +728,14 @@ public class TestParquetWriter extends BaseTestQuery {
         .go();
   }
 
-  private static void deleteTableIfExists(String tableName) {
-    try {
-      Path path = new 
Path(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName);
-      if (fs.exists(path)) {
-        fs.delete(path, true);
-      }
-    } catch (Exception e) {
-      // ignore exceptions.
-    }
-  }
-
   public void runTestAndValidate(String selection, String validationSelection, 
String inputTable, String outputFile) throws Exception {
     try {
-      deleteTableIfExists(outputFile);
+      run("drop table if exists dfs.tmp.%s", outputFile);
 
       final String query = String.format("SELECT %s FROM %s", selection, 
inputTable);
 
-      test("use dfs.tmp");
-      test("CREATE TABLE %s AS %s", outputFile, query);
+      run("use dfs.tmp");
+      run("CREATE TABLE %s AS %s", outputFile, query);
       testBuilder()
           .unOrdered()
           .sqlQuery(query)
@@ -763,7 +753,7 @@ public class TestParquetWriter extends BaseTestQuery {
         assertEquals(DrillVersionInfo.getVersion(), version);
       }
     } finally {
-      deleteTableIfExists(outputFile);
+      run("drop table if exists dfs.tmp.%s", outputFile);
     }
   }
 
@@ -776,10 +766,10 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testImpalaParquetInt96() throws Exception {
     compareParquetReadersColumnar("field_impala_ts", 
"cp.`parquet/int96_impala_1.parquet`");
     try {
-      alterSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP, true);
+      client.alterSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP, 
true);
       compareParquetReadersColumnar("field_impala_ts", 
"cp.`parquet/int96_impala_1.parquet`");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+      client.resetSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
     }
   }
 
@@ -806,7 +796,7 @@ public class TestParquetWriter extends BaseTestQuery {
           .baselineValues(convertToLocalDateTime("1970-01-01 00:00:01.000"))
           .build().run();
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+      client.resetSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
     }
   }
 
@@ -822,7 +812,7 @@ public class TestParquetWriter extends BaseTestQuery {
           .baselineValues(convertToLocalDateTime("2017-12-06 16:38:43.988"))
           .build().run();
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+      client.resetSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
     }
   }
 
@@ -841,12 +831,12 @@ public class TestParquetWriter extends BaseTestQuery {
   @Test
   public void testImpalaParquetTimestampInt96AsTimeStamp() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
+      client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
       compareParquetInt96Converters("field_impala_ts", 
"cp.`parquet/int96_impala_1.parquet`");
-      alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+      client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
       compareParquetInt96Converters("field_impala_ts", 
"cp.`parquet/int96_impala_1.parquet`");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+      client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
     }
   }
 
@@ -897,7 +887,7 @@ public class TestParquetWriter extends BaseTestQuery {
     try (FileWriter fw = new FileWriter(input2)) {
       fw.append("{\"b\":\"foo\"}\n");
     }
-    test("select * from " + "dfs.`" + dir.getAbsolutePath() + "`");
+    run("select * from " + "dfs.`" + dir.getAbsolutePath() + "`");
     runTestAndValidate("*", "*", "dfs.`" + dir.getAbsolutePath() + "`", 
"schema_change_parquet");
   }
 
@@ -959,7 +949,18 @@ public class TestParquetWriter extends BaseTestQuery {
           .build()
           .run();
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+      client.resetSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+    }
+  }
+
+  @Test
+  public void testTPCHReadWriteFormatV2() throws Exception {
+    try {
+      client.alterSession(ExecConstants.PARQUET_WRITER_FORMAT_VERSION, "v2");
+      String inputTable = "cp.`tpch/supplier.parquet`";
+      runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_v2");
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_WRITER_FORMAT_VERSION);
     }
   }
 
@@ -979,22 +980,22 @@ public class TestParquetWriter extends BaseTestQuery {
   @Test
   public void testTPCHReadWriteSnappy() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "snappy");
+      client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, 
"snappy");
       String inputTable = "cp.`tpch/supplier.parquet`";
       runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_snappy");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+      client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
     }
   }
 
   @Test
   public void testTPCHReadWriteGzip() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "gzip");
+      client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, 
"gzip");
       String inputTable = "cp.`supplier_gzip.parquet`";
         runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_gzip");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+      client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
     }
   }
 
@@ -1006,56 +1007,56 @@ public class TestParquetWriter extends BaseTestQuery {
   @EnabledIfSystemProperty(named = "os.arch", matches = "x86_64") // reported 
for OS X on AMD64
   public void testTPCHReadWriteBrotli() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "brotli");
+      client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, 
"brotli");
       // exercise the new Parquet record reader with this parquet-mr-backed 
codec
-      alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+      client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
       String inputTable = "cp.`supplier_brotli.parquet`";
       runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_brotli");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
-      resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+      client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+      client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
     }
   }
 
   @Test
   public void testTPCHReadWriteLz4() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "lz4");
+      client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, 
"lz4");
       // exercise the async Parquet column reader with this 
aircompressor-backed codec
-      alterSession(ExecConstants.PARQUET_COLUMNREADER_ASYNC, true);
+      client.alterSession(ExecConstants.PARQUET_COLUMNREADER_ASYNC, true);
       String inputTable = "cp.`supplier_lz4.parquet`";
       runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_lz4");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
-      resetSessionOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC);
+      client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+      client.resetSession(ExecConstants.PARQUET_COLUMNREADER_ASYNC);
     }
   }
 
   @Test
   public void testTPCHReadWriteLzo() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "lzo");
+      client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, 
"lzo");
       // exercise the async Parquet page reader with this aircompressor-backed 
codec
-      alterSession(ExecConstants.PARQUET_PAGEREADER_ASYNC, true);
+      client.alterSession(ExecConstants.PARQUET_PAGEREADER_ASYNC, true);
       String inputTable = "cp.`supplier_lzo.parquet`";
       runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_lzo");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
-      resetSessionOption(ExecConstants.PARQUET_PAGEREADER_ASYNC);
+      client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+      client.resetSession(ExecConstants.PARQUET_PAGEREADER_ASYNC);
     }
   }
 
   @Test
   public void testTPCHReadWriteZstd() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "zstd");
+      client.alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, 
"zstd");
       // exercise the new Parquet record reader with this aircompressor-backed 
codec
-      alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+      client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
       String inputTable = "cp.`supplier_zstd.parquet`";
       runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_zstd");
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
-      resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER);
+      client.resetSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+      client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
     }
   }
 
@@ -1077,7 +1078,7 @@ public class TestParquetWriter extends BaseTestQuery {
           .build()
           .run();
     } finally {
-      resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+      client.resetSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
     }
   }
 
@@ -1085,8 +1086,8 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testWriteDecimalIntBigIntFixedLen() throws Exception {
     String tableName = "decimalIntBigIntFixedLen";
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
FIXED_LEN_BYTE_ARRAY.name());
-      test(
+      
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
FIXED_LEN_BYTE_ARRAY.name());
+      run(
           "create table dfs.tmp.%s as\n" +
               "select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
                      "cast('123456.789123456789' as decimal(18, 12)) as 
decBigInt,\n" +
@@ -1106,9 +1107,9 @@ public class TestParquetWriter extends BaseTestQuery {
               new BigDecimal("123456.789123456789"))
           .go();
     } finally {
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
-      test("drop table if exists dfs.tmp.%s", tableName);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+      run("drop table if exists dfs.tmp.%s", tableName);
     }
   }
 
@@ -1116,9 +1117,9 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testWriteDecimalIntBigIntBinary() throws Exception {
     String tableName = "decimalIntBigIntBinary";
     try {
-      
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, 
true);
-      alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
BINARY.name());
-      test(
+      
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
 true);
+      
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
BINARY.name());
+      run(
           "create table dfs.tmp.%s as\n" +
               "select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
                      "cast('123456.789123456789' as decimal(18, 12)) as 
decBigInt,\n" +
@@ -1138,9 +1139,9 @@ public class TestParquetWriter extends BaseTestQuery {
             new BigDecimal("123456.789123456789"))
           .go();
     } finally {
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
-      test("drop table if exists dfs.tmp.%s", tableName);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+      run("drop table if exists dfs.tmp.%s", tableName);
     }
   }
 
@@ -1148,9 +1149,9 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testWriteDecimalFixedLenOnly() throws Exception {
     String tableName = "decimalFixedLenOnly";
     try {
-      
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, 
false);
-      alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
FIXED_LEN_BYTE_ARRAY.name());
-      test(
+      
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
 false);
+      
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
FIXED_LEN_BYTE_ARRAY.name());
+      run(
           "create table dfs.tmp.%s as\n" +
               "select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
                      "cast('123456.789123456789' as decimal(18, 12)) as 
decBigInt,\n" +
@@ -1170,9 +1171,9 @@ public class TestParquetWriter extends BaseTestQuery {
             new BigDecimal("123456.789123456789"))
           .go();
     } finally {
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
-      test("drop table if exists dfs.tmp.%s", tableName);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+      run("drop table if exists dfs.tmp.%s", tableName);
     }
   }
 
@@ -1180,9 +1181,9 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testWriteDecimalBinaryOnly() throws Exception {
     String tableName = "decimalBinaryOnly";
     try {
-      
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, 
false);
-      alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
BINARY.name());
-      test(
+      
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
 false);
+      
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
BINARY.name());
+      run(
         "create table dfs.tmp.%s as\n" +
             "select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
                    "cast('123456.789123456789' as decimal(18, 12)) as 
decBigInt,\n" +
@@ -1202,9 +1203,9 @@ public class TestParquetWriter extends BaseTestQuery {
             new BigDecimal("123456.789123456789"))
           .go();
     } finally {
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
-      test("drop table if exists dfs.tmp.%s", tableName);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+      run("drop table if exists dfs.tmp.%s", tableName);
     }
   }
 
@@ -1228,9 +1229,9 @@ public class TestParquetWriter extends BaseTestQuery {
     fixedLen.add(new BigDecimal("0.000000"));
 
     try {
-      
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, 
true);
-      alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
FIXED_LEN_BYTE_ARRAY.name());
-      test(
+      
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
 true);
+      
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
FIXED_LEN_BYTE_ARRAY.name());
+      run(
         "create table dfs.tmp.%s as\n" +
             "select * from 
cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName);
       checkTableTypes(tableName,
@@ -1247,9 +1248,9 @@ public class TestParquetWriter extends BaseTestQuery {
           .baselineValues(ints, longs, fixedLen, fixedLen)
           .go();
     } finally {
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
-      test("drop table if exists dfs.tmp.%s", tableName);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+      run("drop table if exists dfs.tmp.%s", tableName);
     }
   }
 
@@ -1273,9 +1274,9 @@ public class TestParquetWriter extends BaseTestQuery {
     fixedLen.add(new BigDecimal("0.000000"));
 
     try {
-      
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, 
false);
-      alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
FIXED_LEN_BYTE_ARRAY.name());
-      test(
+      
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
 false);
+      
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
FIXED_LEN_BYTE_ARRAY.name());
+      run(
           "create table dfs.tmp.%s as\n" +
               "select * from 
cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName);
       checkTableTypes(tableName,
@@ -1292,9 +1293,9 @@ public class TestParquetWriter extends BaseTestQuery {
           .baselineValues(ints, longs, fixedLen, fixedLen)
           .go();
     } finally {
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
-      test("drop table if exists dfs.tmp.%s", tableName);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+      run("drop table if exists dfs.tmp.%s", tableName);
     }
   }
 
@@ -1317,9 +1318,9 @@ public class TestParquetWriter extends BaseTestQuery {
     fixedLen.add(new BigDecimal("-999999999999.999999"));
     fixedLen.add(new BigDecimal("0.000000"));
     try {
-      
alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, 
false);
-      alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
BINARY.name());
-      test(
+      
client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS,
 false);
+      
client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, 
BINARY.name());
+      run(
         "create table dfs.tmp.%s as\n" +
             "select * from 
cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName);
       checkTableTypes(tableName,
@@ -1336,9 +1337,9 @@ public class TestParquetWriter extends BaseTestQuery {
           .baselineValues(ints, longs, fixedLen, fixedLen)
           .go();
     } finally {
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
-      
resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
-      test("drop table if exists dfs.tmp.%s", tableName);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+      
client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+      run("drop table if exists dfs.tmp.%s", tableName);
     }
   }
 
@@ -1346,7 +1347,7 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testCtasForList() throws Exception {
     String tableName = "testCtasForList";
     try {
-      test("CREATE TABLE `%s`.`%s` AS SELECT l FROM 
cp.`jsoninput/input2.json`", DFS_TMP_SCHEMA, tableName);
+      run("CREATE TABLE `%s`.`%s` AS SELECT l FROM 
cp.`jsoninput/input2.json`", DFS_TMP_SCHEMA, tableName);
       testBuilder()
           .sqlQuery("SELECT * FROM `%s`.`/%s` LIMIT 1", DFS_TMP_SCHEMA, 
tableName)
           .unOrdered()
@@ -1354,7 +1355,7 @@ public class TestParquetWriter extends BaseTestQuery {
           .baselineValues(asList(4L, 2L))
           .go();
     } finally {
-      test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
+      run("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
     }
   }
 
@@ -1362,7 +1363,7 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testCtasForRepeatedList() throws Exception {
     String tableName = "testCtasForRepeatedList";
     try {
-      test("CREATE TABLE `%s`.`%s` AS SELECT * FROM 
cp.`jsoninput/repeated_list_bug.json`", DFS_TMP_SCHEMA, tableName);
+      run("CREATE TABLE `%s`.`%s` AS SELECT * FROM 
cp.`jsoninput/repeated_list_bug.json`", DFS_TMP_SCHEMA, tableName);
       testBuilder()
           .sqlQuery("SELECT rl FROM `%s`.`/%s`", DFS_TMP_SCHEMA, tableName)
           .unOrdered()
@@ -1371,7 +1372,7 @@ public class TestParquetWriter extends BaseTestQuery {
           .baselineValues(asList(asList(9L, 7L), asList(4L, 8L)))
           .go();
     } finally {
-      test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
+      run("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
     }
   }
 
@@ -1379,7 +1380,7 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testCtasForRepeatedListOfMaps() throws Exception {
     String tableName = "testCtasForRepeatedListOfMaps";
     try {
-      test("CREATE TABLE `%s`.`%s` AS SELECT * FROM 
cp.`jsoninput/repeated_list_of_maps.json`", DFS_TMP_SCHEMA, tableName);
+      run("CREATE TABLE `%s`.`%s` AS SELECT * FROM 
cp.`jsoninput/repeated_list_of_maps.json`", DFS_TMP_SCHEMA, tableName);
       testBuilder()
           .sqlQuery("SELECT * FROM `%s`.`/%s`", DFS_TMP_SCHEMA, tableName)
           .unOrdered()
@@ -1394,7 +1395,7 @@ public class TestParquetWriter extends BaseTestQuery {
           ))
           .go();
     } finally {
-      test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
+      run("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
     }
   }
 
@@ -1402,8 +1403,8 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testCTASWithDictInSelect() throws Exception {
     String tableName = "table_with_dict";
     try {
-      test("use dfs.tmp");
-      test("create table %s as select id, mapcol from 
cp.`store/parquet/complex/map/parquet/000000_0.parquet`", tableName);
+      run("use dfs.tmp");
+      run("create table %s as select id, mapcol from 
cp.`store/parquet/complex/map/parquet/000000_0.parquet`", tableName);
       testBuilder()
           .sqlQuery("select * from %s", tableName)
           .unOrdered()
@@ -1415,7 +1416,7 @@ public class TestParquetWriter extends BaseTestQuery {
           .baselineValues(2, TestBuilder.mapOfObject("a", 1, "b", 2, "c", 3))
           .go();
     } finally {
-      test("DROP TABLE IF EXISTS %s", tableName);
+      run("DROP TABLE IF EXISTS %s", tableName);
     }
   }
 
@@ -1423,8 +1424,8 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testCTASWithRepeatedDictInSelect() throws Exception {
     String tableName = "table_with_dict_array";
     try {
-      test("use dfs.tmp");
-      test("create table %s as select id, map_array from 
cp.`store/parquet/complex/map/parquet/000000_0.parquet`", tableName);
+      run("use dfs.tmp");
+      run("create table %s as select id, map_array from 
cp.`store/parquet/complex/map/parquet/000000_0.parquet`", tableName);
       testBuilder()
           .sqlQuery("select * from %s", tableName)
           .unOrdered()
@@ -1471,7 +1472,44 @@ public class TestParquetWriter extends BaseTestQuery {
           )
           .go();
     } finally {
-      test("DROP TABLE IF EXISTS %s", tableName);
+      run("DROP TABLE IF EXISTS %s", tableName);
+    }
+  }
+
+  @Test
+  public void testFormatConfigOpts() throws Exception {
+    FileSystemConfig pluginConfig = (FileSystemConfig) 
cluster.storageRegistry().copyConfig("dfs");
+    FormatPluginConfig backupConfig = pluginConfig.getFormats().get("parquet");
+
+    cluster.defineFormat("dfs", "parquet", new ParquetFormatConfig(
+        false,
+        true,
+        123,
+        456,
+        true,
+        "snappy",
+        "binary",
+        true,
+        "v2"
+      )
+    );
+
+    try {
+      String query = "select * from dfs.`parquet/int96_dict_change`";
+      queryBuilder()
+        .sql(query)
+        .jsonPlanMatcher()
+        .include("\"autoCorrectCorruptDates\" : false")
+        .include("\"enableStringsSignedMinMax\" : true")
+        .include("\"blockSize\" : 123")
+        .include("\"pageSize\" : 456")
+        .include("\"useSingleFSBlock\" : true")
+        .include("\"writerCompressionType\" : \"snappy\"")
+        .include("\"writerUsePrimitivesForDecimals\" : true")
+        .include("\"writerFormatVersion\" : \"v2\"")
+        .match();
+    } finally {
+      cluster.defineFormat("dfs", "parquet", backupConfig);
     }
   }
 
@@ -1493,15 +1531,26 @@ public class TestParquetWriter extends BaseTestQuery {
 
     for (Pair<String, PrimitiveType.PrimitiveTypeName> nameType : 
columnsToCheck) {
       assertEquals(
-          String.format("Table %s does not contain column %s with type %s",
-              tableName, nameType.getKey(), nameType.getValue()),
           nameType.getValue(),
-          
schema.getType(nameType.getKey()).asPrimitiveType().getPrimitiveTypeName());
+          
schema.getType(nameType.getKey()).asPrimitiveType().getPrimitiveTypeName(),
+          String.format(
+            "Table %s does not contain column %s with type %s",
+            tableName,
+            nameType.getKey(),
+            nameType.getValue()
+          )
+        );
 
       assertEquals(
-        String.format("Table %s %s column %s with DECIMAL type", tableName,
-            isDecimalType ? "does not contain" : "contains unexpected", 
nameType.getKey()),
-        isDecimalType, schema.getType(nameType.getKey()).getOriginalType() == 
OriginalType.DECIMAL);
+        isDecimalType,
+        schema.getType(nameType.getKey()).getLogicalTypeAnnotation() 
instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation,
+        String.format(
+          "Table %s %s column %s with DECIMAL type",
+          tableName,
+          isDecimalType ? "does not contain" : "contains unexpected",
+          nameType.getKey()
+        )
+      );
     }
   }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index 2889b71..d9848b6 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -54,7 +54,14 @@ public class TestFormatPluginOptionExtractor extends 
BaseTest {
           assertEquals("(type: String, name: String)", d.presentParams());
           break;
         case "parquet":
-          assertEquals(d.typeName, "(type: String, autoCorrectCorruptDates: 
boolean, enableStringsSignedMinMax: boolean)", d.presentParams());
+          assertEquals(
+            d.typeName,
+            "(type: String, autoCorrectCorruptDates: boolean, 
enableStringsSignedMinMax: boolean, " +
+              "blockSize: Integer, pageSize: Integer, useSingleFSBlock: 
Boolean, writerCompressionType: String, " +
+              "writerLogicalTypeForDecimals: String, 
writerUsePrimitivesForDecimals: Boolean, " +
+              "writerFormatVersion: String)",
+            d.presentParams()
+          );
           break;
         case "json":
           assertEquals(d.typeName, "(type: String)", d.presentParams());
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
index 6b6341a..f44a56a 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
@@ -22,6 +22,7 @@ import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.test.BaseTest;
 import org.apache.hadoop.conf.Configuration;
@@ -97,28 +98,32 @@ public class TestParquetReaderConfig extends BaseTest {
   @Test
   public void testPriorityAssignmentForStringsSignedMinMax() throws Exception {
     @SuppressWarnings("resource")
-    SystemOptionManager options = new 
SystemOptionManager(DrillConfig.create()).init();
+    SystemOptionManager sysOpts = new 
SystemOptionManager(DrillConfig.create()).init();
+    SessionOptionManager sessOpts = new SessionOptionManager(sysOpts, null);
 
     // use value from format config
     ParquetFormatConfig formatConfig = new ParquetFormatConfig();
     ParquetReaderConfig readerConfig = 
ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
-    assertEquals(formatConfig.isStringsSignedMinMaxEnabled(), 
readerConfig.enableStringsSignedMinMax());
+    assertEquals(formatConfig.isEnableStringsSignedMinMax(), 
readerConfig.enableStringsSignedMinMax());
 
     // change format config value
-    formatConfig = new ParquetFormatConfig(true, true);
+    formatConfig = ParquetFormatConfig.builder()
+      .autoCorrectCorruptDates(true)
+      .enableStringsSignedMinMax(true)
+      .build();
+
     readerConfig = 
ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
-    assertEquals(formatConfig.isStringsSignedMinMaxEnabled(), 
readerConfig.enableStringsSignedMinMax());
+    assertEquals(formatConfig.isEnableStringsSignedMinMax(), 
readerConfig.enableStringsSignedMinMax());
 
-    // set option, option value should have higher priority
-    
options.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, 
"false");
+    // set system option, option value should not have higher priority
+    
sysOpts.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, 
"false");
+    readerConfig = 
ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(sysOpts).build();
+    assertTrue(readerConfig.enableStringsSignedMinMax());
 
-    readerConfig = 
ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(options).build();
+    // set session option, option value should have higher priority
+    
sessOpts.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, 
"false");
+    readerConfig = 
ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(sessOpts).build();
     assertFalse(readerConfig.enableStringsSignedMinMax());
-
-    // set option as empty (undefined), config should have higher priority
-    
options.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "");
-    readerConfig = 
ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(options).build();
-    assertEquals(formatConfig.isStringsSignedMinMaxEnabled(), 
readerConfig.enableStringsSignedMinMax());
   }
 
 
@@ -128,4 +133,4 @@ public class TestParquetReaderConfig extends BaseTest {
     assertEquals(expectedValue, actualValue);
   }
 
-}
\ No newline at end of file
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
index 1afd3b3..8cae668 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
@@ -38,6 +38,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.math.BigDecimal;
+import java.nio.charset.Charset;
 import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.List;
@@ -112,7 +113,9 @@ public class TestVarlenDecimal extends ClusterTest {
               "select cast('%s' as decimal(36, 9)) dec36", tableName, 
bigDecimalValue);
 
       String json = FileUtils.readFileToString(
-          Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), tableName, 
"0_0_0.json").toFile());
+        Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), tableName, 
"0_0_0.json").toFile(),
+        Charset.defaultCharset()
+      );
 
       Assert.assertThat(json, CoreMatchers.containsString(bigDecimalValue));
 
@@ -142,7 +145,9 @@ public class TestVarlenDecimal extends ClusterTest {
               "select cast('%s' as decimal(36, 9)) dec36", tableName, 
bigDecimalValue);
 
       String csv = FileUtils.readFileToString(
-          Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), tableName, 
"0_0_0.csvh").toFile());
+        Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), tableName, 
"0_0_0.csvh").toFile(),
+        Charset.defaultCharset()
+      );
 
       Assert.assertThat(csv, CoreMatchers.containsString(bigDecimalValue));
 

Reply via email to