This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a4e3552  HIVE-25501: Provide a filter for removing useless properties 
from PartitionDesc objects before MapWork serialization (#2620) (Laszlo Bodor 
reviewed by Rajesh Balamohan)
a4e3552 is described below

commit a4e3552e9eff53dd44d3aaae4f91ad2fb89cec6e
Author: Bodor Laszlo <[email protected]>
AuthorDate: Tue Sep 14 11:07:11 2021 +0200

    HIVE-25501: Provide a filter for removing useless properties from 
PartitionDesc objects before MapWork serialization (#2620) (Laszlo Bodor 
reviewed by Rajesh Balamohan)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   7 +
 .../hive/ql/exec/SerializationUtilities.java       | 148 +++++++++++----
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  |   2 +-
 .../org/apache/hadoop/hive/ql/plan/MapWork.java    |   1 -
 .../hive/ql/exec/TestSerializationUtilities.java   | 199 +++++++++++++++++++++
 5 files changed, 318 insertions(+), 39 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cb5b981..f61b903 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4524,6 +4524,13 @@ public class HiveConf extends Configuration {
 
     HIVE_RPC_QUERY_PLAN("hive.rpc.query.plan", false,
         "Whether to send the query plan via local resource or RPC"),
+    
HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES("hive.plan.mapwork.serialization.skip.properties",
 "",
+        "Comma separated list of properties which is not needed in execution 
time, so can be removed "
+            + "from PartitionDesc properties before serialization, config can 
contain exact strings and regex "
+            + "expressions, the regex mode is activated if at least 1 asterisk 
(*) is present in the current word: "
+            + "rawDataSize                exact string match, removes only 
rawDataSize property"
+            + ".*Size                     regex match, removes every property 
ending with 'Size'"
+            + "numRows,impala_.*chunk.*   comma separated and mixed (handles 
strings and regexes at the same time)"),
     HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true,
         "Whether to generate the splits locally or in the AM (tez only)"),
     
HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits",
 true,
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index d4e8407..56e1b32 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -31,15 +31,22 @@ import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties;
 import org.apache.hadoop.hive.common.type.TimestampTZ;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
@@ -60,6 +67,7 @@ import com.esotericsoftware.kryo.Registration;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.util.Pool;
+import com.google.common.annotations.VisibleForTesting;
 import com.esotericsoftware.kryo.serializers.FieldSerializer;
 
 /**
@@ -111,8 +119,10 @@ public class SerializationUtilities {
   /**
    * Provides general-purpose hooks for specific types, as well as a global 
hook.
    */
-  private static class KryoWithHooks extends Kryo {
+  private static class KryoWithHooks extends Kryo implements Configurable {
     private Hook globalHook;
+    // this should be set on-the-fly after borrowing this instance and needs 
to be reset on release
+    private Configuration configuration;
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     private static final class SerializerWithHook extends 
com.esotericsoftware.kryo.Serializer {
@@ -206,6 +216,17 @@ public class SerializationUtilities {
       T result = super.readObject(input, type, serializer);
       return ponderGlobalPostReadHook(hook, result);
     }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.configuration = conf;
+
+    }
+
+    @Override
+    public Configuration getConf() {
+      return configuration;
+    }
   }
 
   private static final Object FAKE_REFERENCE = new Object();
@@ -234,6 +255,7 @@ public class SerializationUtilities {
     kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
     kryo.register(new java.util.ArrayList().subList(0,0).getClass(), new 
ArrayListSubListSerializer());
     kryo.register(CopyOnFirstWriteProperties.class, new 
CopyOnFirstWritePropertiesSerializer());
+    kryo.register(MapWork.class, new MapWorkSerializer(kryo, MapWork.class));
     kryo.register(PartitionDesc.class, new PartitionDescSerializer(kryo, 
PartitionDesc.class));
 
     ((DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
@@ -254,8 +276,13 @@ public class SerializationUtilities {
    * @return kryo instance
    */
   public static Kryo borrowKryo() {
+    return borrowKryo(null);
+  }
+
+  public static Kryo borrowKryo(Configuration configuration) {
     Kryo kryo = kryoPool.obtain();
     kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+    ((KryoWithHooks) kryo).setConf(configuration);
     return kryo;
   }
 
@@ -265,6 +292,9 @@ public class SerializationUtilities {
    * @param kryo - kryo instance to be released
    */
   public static void releaseKryo(Kryo kryo) {
+    if (kryo != null){
+      ((KryoWithHooks) kryo).setConf(null);
+    }
     kryoPool.free(kryo);
   }
 
@@ -542,6 +572,70 @@ public class SerializationUtilities {
   }
 
   /**
+   * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link 
MapWork} objects e.g. in
+   * order to remove useless properties in execution time.
+   */
+  private static class MapWorkSerializer extends FieldSerializer<MapWork> {
+
+    public MapWorkSerializer(Kryo kryo, Class type) {
+      super(kryo, type);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, MapWork mapWork) {
+      filterMapworkProperties(kryo, mapWork);
+      super.write(kryo, output, mapWork);
+    }
+
+    private void filterMapworkProperties(Kryo kryo, MapWork mapWork) {
+      Configuration configuration = ((KryoWithHooks) kryo).getConf();
+      if (configuration == null || HiveConf
+          .getVar(configuration, 
HiveConf.ConfVars.HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES).isEmpty()) {
+        return;
+      }
+      String[] filterProps =
+          HiveConf.getVar(configuration, 
HiveConf.ConfVars.HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES).split(",");
+      for (String prop : filterProps) {
+        boolean isRegex = isRegex(prop);
+        Pattern pattern = Pattern.compile(prop);
+
+        LOG.debug("Trying to filter MapWork properties (regex: " + isRegex + 
"): " + prop);
+
+        for (Entry<Path, PartitionDesc> partDescEntry : 
mapWork.getPathToPartitionInfo().entrySet()) {
+          /*
+           * remove by regex, could be a bit more expensive because of 
iterating and matching regexes
+           * e.g.: in case of impala_intermediate_stats_chunk1, 
impala_intermediate_stats_chunk2, user only needs to
+           * configure impala_intermediate_stats_chunk.*
+           */
+          if (isRegex) {
+            Iterator<Entry<Object, Object>> itProps =
+                partDescEntry.getValue().getProperties().entrySet().iterator();
+            while (itProps.hasNext()) {
+              Map.Entry<Object, Object> entry = itProps.next();
+              String actualProp = (String) entry.getKey();
+              Matcher matcher = pattern.matcher(actualProp);
+
+              if (matcher.find()) {
+                LOG.debug("Removed '{}' from MapWork (partition: {})", 
actualProp, partDescEntry.getKey());
+                itProps.remove();
+              }
+            }
+          } else {
+            Object objRemoved = 
partDescEntry.getValue().getProperties().remove(prop);
+            if (objRemoved != null) {
+              LOG.debug("Removed '{}' from MapWork (partition: {})", prop, 
partDescEntry.getKey());
+            }
+          }
+        }
+      }
+    }
+
+    private boolean isRegex(String prop) {
+      return prop.contains("*");
+    }
+  }
+
+  /**
    * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link 
PartitionDesc} objects
    * in order to invoke any string interning code present in the "setter" 
methods. {@link
    * PartitionDesc} objects are usually stored by {@link MapWork} objects and 
contain duplicate info
@@ -573,31 +667,24 @@ public class SerializationUtilities {
    * @param out  The stream to write to.
    */
   public static void serializePlan(Object plan, OutputStream out) {
-    serializePlan(plan, out, false);
-  }
-
-  public static void serializePlan(Kryo kryo, Object plan, OutputStream out) {
-    serializePlan(kryo, plan, out, false);
+    serializePlan(plan, out, null);
   }
 
-  private static void serializePlan(Object plan, OutputStream out, boolean 
cloningPlan) {
-    Kryo kryo = borrowKryo();
+  @VisibleForTesting
+  static void serializePlan(Object plan, OutputStream out, Configuration 
configuration) {
+    Kryo kryo = borrowKryo(configuration);
     try {
-      serializePlan(kryo, plan, out, cloningPlan);
+      serializePlan(kryo, plan, out);
     } finally {
       releaseKryo(kryo);
     }
   }
 
-  private static void serializePlan(Kryo kryo, Object plan, OutputStream out, 
boolean cloningPlan) {
+  public static void serializePlan(Kryo kryo, Object plan, OutputStream out) {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
     LOG.info("Serializing " + plan.getClass().getSimpleName() + " using kryo");
-    if (cloningPlan) {
-      serializeObjectByKryo(kryo, plan, out);
-    } else {
-      serializeObjectByKryo(kryo, plan, out);
-    }
+    serializeObjectByKryo(kryo, plan, out);
     perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
   }
 
@@ -609,35 +696,22 @@ public class SerializationUtilities {
    * @return The plan, such as QueryPlan, MapredWork, etc.
    */
   public static <T> T deserializePlan(InputStream in, Class<T> planClass) {
-    return deserializePlan(in, planClass, false);
-  }
-
-  public static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> 
planClass) {
-    return deserializePlan(kryo, in, planClass, false);
-  }
-
-  private static <T> T deserializePlan(InputStream in, Class<T> planClass, 
boolean cloningPlan) {
     Kryo kryo = borrowKryo();
     T result = null;
     try {
-      result = deserializePlan(kryo, in, planClass, cloningPlan);
+      result = deserializePlan(kryo, in, planClass);
     } finally {
       releaseKryo(kryo);
     }
     return result;
   }
 
-  private static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> 
planClass,
-      boolean cloningPlan) {
+  public static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> 
planClass) {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
     T plan;
     LOG.info("Deserializing " + planClass.getSimpleName() + " using kryo");
-    if (cloningPlan) {
-      plan = deserializeObjectByKryo(kryo, in, planClass);
-    } else {
-      plan = deserializeObjectByKryo(kryo, in, planClass);
-    }
+    plan = deserializeObjectByKryo(kryo, in, planClass);
     perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
     return plan;
   }
@@ -654,9 +728,9 @@ public class SerializationUtilities {
     Operator<?> op = plan.getAnyOperator();
     CompilationOpContext ctx = (op == null) ? null : 
op.getCompilationOpContext();
     ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-    serializePlan(plan, baos, true);
+    serializePlan(plan, baos);
     MapredWork newPlan = deserializePlan(new 
ByteArrayInputStream(baos.toByteArray()),
-        MapredWork.class, true);
+        MapredWork.class);
     // Restore the context.
     for (Operator<?> newOp : newPlan.getAllOperators()) {
       newOp.setCompilationOpContext(ctx);
@@ -676,11 +750,11 @@ public class SerializationUtilities {
     }
     ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
     CompilationOpContext ctx = roots.get(0).getCompilationOpContext();
-    serializePlan(roots, baos, true);
+    serializePlan(roots, baos);
     @SuppressWarnings("unchecked")
     List<Operator<?>> result =
         deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
-            roots.getClass(), true);
+            roots.getClass());
     // Restore the context.
     LinkedList<Operator<?>> newOps = new LinkedList<>(result);
     while (!newOps.isEmpty()) {
@@ -705,9 +779,9 @@ public class SerializationUtilities {
     Operator<?> op = plan.getAnyRootOperator();
     CompilationOpContext ctx = (op == null) ? null : 
op.getCompilationOpContext();
     ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-    serializePlan(plan, baos, true);
+    serializePlan(plan, baos);
     BaseWork newPlan = deserializePlan(new 
ByteArrayInputStream(baos.toByteArray()),
-        plan.getClass(), true);
+        plan.getClass());
     // Restore the context.
     for (Operator<?> newOp : newPlan.getAllOperators()) {
       newOp.setCompilationOpContext(ctx);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 01f0967..7474603 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -582,7 +582,7 @@ public final class Utilities {
   }
 
   private static Path setBaseWork(Configuration conf, BaseWork w, Path 
hiveScratchDir, String name, boolean useCache) {
-    Kryo kryo = SerializationUtilities.borrowKryo();
+    Kryo kryo = SerializationUtilities.borrowKryo(conf);
     try {
       setPlanPath(conf, hiveScratchDir);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 729d2fc..dbbd8c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.IConfigureJobConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestSerializationUtilities.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestSerializationUtilities.java
new file mode 100644
index 0000000..77c0997
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestSerializationUtilities.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSerializationUtilities {
+
+  @Test
+  public void testEveryPropertiesAreSerialized() throws Exception {
+    MapWork mapWork = doSerDeser(null);
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk1", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk2", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"rawDataSize", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"serialization.ddl", true);
+  }
+
+  @Test
+  public void testRegexFilterAll() throws Exception {
+    MapWork mapWork = doSerDeser(getConfWithSkipConfig(".*"));
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk1",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk2",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"serialization.ddl", false);
+  }
+
+  @Test
+  public void testRegexFilterSomeProps() throws Exception {
+    MapWork mapWork = 
doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk.*"));
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk1",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk2",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"rawDataSize", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"serialization.ddl", true);
+  }
+
+  @Test
+  public void testString() throws Exception {
+    MapWork mapWork = doSerDeser(getConfWithSkipConfig("rawDataSize"));
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk1", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk2", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"serialization.ddl", true);
+  }
+
+  @Test
+  public void testStringAndBadRegex() throws Exception {
+    MapWork mapWork = 
doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk,rawDataSize"));
+
+    // impala_intermediate_stats_chunk props are not filtered in this case 
because only "*" activates regex mode
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk1", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk2", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"serialization.ddl", true);
+  }
+
+  @Test
+  public void testStringRegexMixed() throws Exception {
+    MapWork mapWork = 
doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk.*,rawDataSize,.*ddl"));
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk1",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk2",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"serialization.ddl", false);
+  }
+
+  @Test
+  public void testSkippingAppliesToAllPartitions() throws Exception {
+    MapWork mapWork = 
doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk.*,rawDataSize,.*ddl"));
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk1",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"impala_intermediate_stats_chunk2",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", 
"serialization.ddl", false);
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", 
"impala_intermediate_stats_chunk1",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", 
"impala_intermediate_stats_chunk2",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", 
"rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", 
"serialization.ddl", false);
+  }
+
+  private MapWork doSerDeser(Configuration configuration) throws Exception, 
IOException {
+    MapWork mapWork = mockMapWorkWithSomePartitionDescProperties();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    SerializationUtilities.serializePlan(mapWork, baos, configuration);
+    InputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    MapWork mapWorkDeserialized = SerializationUtilities.deserializePlan(bais, 
MapWork.class);
+    baos.close();
+    bais.close();
+    return mapWorkDeserialized;
+  }
+
+  private Configuration getConfWithSkipConfig(String value) {
+    Configuration configuration = new Configuration();
+    HiveConf.setVar(configuration, 
HiveConf.ConfVars.HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES, value);
+    return configuration;
+  }
+
+  private void assertPartitionDescPropertyPresence(MapWork mapWork, String 
partitionPath, String prop,
+      boolean isPresent) {
+    String value = mapWork.getPathToPartitionInfo().get(new 
Path(partitionPath)).getProperties().getProperty(prop);
+    Assert.assertTrue(String.format("'%s' is%ssupposed to be present", prop, 
(isPresent ? " " : " not ")),
+        isPresent ? value != null : value == null);
+  }
+
+  private static MapWork mockMapWorkWithSomePartitionDescProperties() throws 
Exception {
+    String tableName = "test_table";
+    int numPartitions = 2;
+    Path root = new Path("/warehouse", "test_table");
+
+    String[] partPath = new String[numPartitions];
+    StringBuilder buffer = new StringBuilder();
+    for (int p = 0; p < numPartitions; ++p) {
+      partPath[p] = new Path(root, "p=" + p).toString();
+      if (p != 0) {
+        buffer.append(',');
+      }
+      buffer.append(partPath[p]);
+    }
+
+    Properties tblProps = new Properties();
+    TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class, 
tblProps);
+
+    MapWork mapWork = new MapWork();
+
+    Map<Path, List<String>> aliasMap = new LinkedHashMap<>();
+    List<String> aliases = new ArrayList<String>();
+    aliases.add(tableName);
+
+    LinkedHashMap<Path, PartitionDesc> partMap = new LinkedHashMap<>();
+    for (int p = 0; p < numPartitions; ++p) {
+      Path path = new Path(partPath[p]);
+      aliasMap.put(path, aliases);
+      LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, 
String>();
+      PartitionDesc part = new PartitionDesc(tbl, partSpec);
+      part.setVectorPartitionDesc(
+          
VectorPartitionDesc.createVectorizedInputFileFormat("MockInputFileFormatClassName",
 false, null));
+
+      part.getProperties().put("impala_intermediate_stats_chunk1", 
"asdfghjk12345678");
+      part.getProperties().put("impala_intermediate_stats_chunk2", 
"asdfghjk12345678");
+      part.getProperties().put("rawDataSize", "10");
+      part.getProperties().put("serialization.ddl", "asdf");
+
+      partMap.put(path, part);
+    }
+    mapWork.setPathToAliases(aliasMap);
+    mapWork.setPathToPartitionInfo(partMap);
+
+    return mapWork;
+  }
+}

Reply via email to