ayushtkn commented on code in PR #4952:
URL: https://github.com/apache/hive/pull/4952#discussion_r1431774350


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only 
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+  private StorageDescriptor storageDescriptor; // for Create in insert-only
+
+  /**
+   * Construct a CompactionQueryBuilderForInsertOnly with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation      query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null or the 
compaction type is REBALANCE
+   */
+  CompactionQueryBuilderForInsertOnly(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, true, resultTableName);
+    if (CompactionType.REBALANCE.equals(compactionType)) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on full ACID tables!");
+    }
+  }
+
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {

Review Comment:
   Missing ``Override ``



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only 
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+  private StorageDescriptor storageDescriptor; // for Create in insert-only
+
+  /**
+   * Construct a CompactionQueryBuilderForInsertOnly with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation      query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null or the 
compaction type is REBALANCE
+   */
+  CompactionQueryBuilderForInsertOnly(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, true, resultTableName);
+    if (CompactionType.REBALANCE.equals(compactionType)) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on full ACID tables!");
+    }
+  }
+
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    List<FieldSchema> cols;
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
|| CompactionType.MINOR.equals(
+        compactionType)) {
+      if (sourceTab == null) {
+        return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+      }
+      cols = sourceTab.getSd().getCols();
+    } else {
+      cols = null;
+    }
+    switch (compactionType) {
+    case MAJOR: {
+      if (sourcePartition != null) { //mmmajor and partitioned
+        appendColumns(query, cols, false);
+      } else { // mmmajor and unpartitioned
+        query.append("*");
+      }
+      break;
+    }
+    case MINOR: {
+      appendColumns(query, cols, false);
+    }
+    }
+  }
+
+  protected void getSourceForInsert(StringBuilder query) {

Review Comment:
   missing Override



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only 
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+  private StorageDescriptor storageDescriptor; // for Create in insert-only
+
+  /**
+   * Construct a CompactionQueryBuilderForInsertOnly with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation      query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null or the 
compaction type is REBALANCE
+   */
+  CompactionQueryBuilderForInsertOnly(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, true, resultTableName);
+    if (CompactionType.REBALANCE.equals(compactionType)) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on full ACID tables!");
+    }
+  }
+
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    List<FieldSchema> cols;
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
|| CompactionType.MINOR.equals(
+        compactionType)) {
+      if (sourceTab == null) {
+        return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+      }
+      cols = sourceTab.getSd().getCols();
+    } else {
+      cols = null;
+    }
+    switch (compactionType) {
+    case MAJOR: {
+      if (sourcePartition != null) { //mmmajor and partitioned
+        appendColumns(query, cols, false);
+      } else { // mmmajor and unpartitioned
+        query.append("*");
+      }
+      break;
+    }
+    case MINOR: {
+      appendColumns(query, cols, false);
+    }
+    }
+  }
+
+  protected void getSourceForInsert(StringBuilder query) {
+    if (sourceTabForInsert != null) {
+      query.append(sourceTabForInsert);
+    } else {
+      
query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
+    }
+    query.append(" ");
+    if (CompactionType.MAJOR.equals(compactionType) && 
StringUtils.isNotBlank(orderByClause)) {
+      query.append(orderByClause);
+    }
+  }
+
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
&& sourceTab != null) {
+      List<String> vals = sourcePartition.getValues();
+      List<FieldSchema> keys = sourceTab.getPartitionKeys();
+      if (keys.size() != vals.size()) {
+        throw new IllegalStateException("source partition values (" + 
Arrays.toString(
+            vals.toArray()) + ") do not match source table values (" + 
Arrays.toString(
+            keys.toArray()) + "). Failing compaction.");
+      }
+
+      query.append(" where ");
+      for (int i = 0; i < keys.size(); ++i) {
+        FieldSchema keySchema = keys.get(i);
+        query.append(i == 0 ? "`" : " and 
`").append(keySchema.getName()).append("`=");
+        if 
(!keySchema.getType().equalsIgnoreCase(ColumnType.BOOLEAN_TYPE_NAME)) {
+          query.append("'").append(vals.get(i)).append("'");
+        } else {
+          query.append(vals.get(i));
+        }
+      }
+    }
+  }

Review Comment:
   This code is dupe in both classes here & CompactionQueryBuilderForMajor, 
refactor and put in one place only



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only 
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+  private StorageDescriptor storageDescriptor; // for Create in insert-only
+
+  /**
+   * Construct a CompactionQueryBuilderForInsertOnly with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation      query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null or the 
compaction type is REBALANCE
+   */
+  CompactionQueryBuilderForInsertOnly(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, true, resultTableName);
+    if (CompactionType.REBALANCE.equals(compactionType)) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on full ACID tables!");
+    }
+  }
+
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    List<FieldSchema> cols;
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
|| CompactionType.MINOR.equals(
+        compactionType)) {
+      if (sourceTab == null) {
+        return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+      }
+      cols = sourceTab.getSd().getCols();
+    } else {
+      cols = null;
+    }
+    switch (compactionType) {
+    case MAJOR: {
+      if (sourcePartition != null) { //mmmajor and partitioned
+        appendColumns(query, cols, false);
+      } else { // mmmajor and unpartitioned
+        query.append("*");
+      }
+      break;
+    }
+    case MINOR: {
+      appendColumns(query, cols, false);
+    }
+    }
+  }
+
+  protected void getSourceForInsert(StringBuilder query) {
+    if (sourceTabForInsert != null) {
+      query.append(sourceTabForInsert);
+    } else {
+      
query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
+    }
+    query.append(" ");
+    if (CompactionType.MAJOR.equals(compactionType) && 
StringUtils.isNotBlank(orderByClause)) {
+      query.append(orderByClause);
+    }
+  }
+
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
&& sourceTab != null) {
+      List<String> vals = sourcePartition.getValues();
+      List<FieldSchema> keys = sourceTab.getPartitionKeys();
+      if (keys.size() != vals.size()) {
+        throw new IllegalStateException("source partition values (" + 
Arrays.toString(
+            vals.toArray()) + ") do not match source table values (" + 
Arrays.toString(
+            keys.toArray()) + "). Failing compaction.");
+      }
+
+      query.append(" where ");
+      for (int i = 0; i < keys.size(); ++i) {
+        FieldSchema keySchema = keys.get(i);
+        query.append(i == 0 ? "`" : " and 
`").append(keySchema.getName()).append("`=");
+        if 
(!keySchema.getType().equalsIgnoreCase(ColumnType.BOOLEAN_TYPE_NAME)) {
+          query.append("'").append(vals.get(i)).append("'");
+        } else {
+          query.append(vals.get(i));
+        }
+      }
+    }
+  }
+
+  protected void getDdlForCreate(StringBuilder query) {
+    defineColumns(query);
+
+    // PARTITIONED BY. Used for parts of minor compaction.
+    if (isPartitioned) {
+      query.append(" PARTITIONED BY (`file_name` STRING) ");
+    }
+
+    // CLUSTERED BY. (bucketing)
+    getMmBucketing(query);
+
+    // SKEWED BY
+    getSkewedByClause(query);
+
+    // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+    copySerdeFromSourceTable(query);
+
+    // LOCATION
+    if (location != null) {
+      query.append(" LOCATION 
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+    }
+
+    // TBLPROPERTIES
+    addTblProperties(query);
+  }
+
+  /**
+   * Define columns of the create query.
+   */
+  private void defineColumns(StringBuilder query) {
+    if (sourceTab == null) {
+      return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+    }
+    query.append("(");
+    List<String> columnDescs = getColumnDescs();
+    query.append(StringUtils.join(columnDescs, ','));
+    query.append(") ");
+  }
+
+  /**
+   * Part of Create operation. Copy source table bucketing for insert-only 
compaction.
+   */
+  private void getMmBucketing(StringBuilder query) {
+    if (sourceTab == null) {
+      return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+    }
+    boolean isFirst;
+    List<String> buckCols = sourceTab.getSd().getBucketCols();
+    if (buckCols.size() > 0) {
+      query.append("CLUSTERED BY (").append(StringUtils.join(buckCols, 
",")).append(") ");
+      List<Order> sortCols = sourceTab.getSd().getSortCols();
+      if (sortCols.size() > 0) {

Review Comment:
   change to ``      if (!sortCols.isEmpty()) {``



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only 
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+  private StorageDescriptor storageDescriptor; // for Create in insert-only
+
+  /**
+   * Construct a CompactionQueryBuilderForInsertOnly with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation      query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null or the 
compaction type is REBALANCE
+   */
+  CompactionQueryBuilderForInsertOnly(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, true, resultTableName);
+    if (CompactionType.REBALANCE.equals(compactionType)) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on full ACID tables!");
+    }
+  }
+
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    List<FieldSchema> cols;
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
|| CompactionType.MINOR.equals(
+        compactionType)) {
+      if (sourceTab == null) {
+        return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+      }
+      cols = sourceTab.getSd().getCols();
+    } else {
+      cols = null;
+    }

Review Comment:
   Can't we adjust this same logic inside the switch case? Otherwise this if 
check logic looks like redundant



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only 
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+  private StorageDescriptor storageDescriptor; // for Create in insert-only
+
+  /**
+   * Construct a CompactionQueryBuilderForInsertOnly with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation      query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null or the 
compaction type is REBALANCE
+   */
+  CompactionQueryBuilderForInsertOnly(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, true, resultTableName);
+    if (CompactionType.REBALANCE.equals(compactionType)) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on full ACID tables!");
+    }
+  }
+
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    List<FieldSchema> cols;
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
|| CompactionType.MINOR.equals(
+        compactionType)) {
+      if (sourceTab == null) {
+        return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+      }
+      cols = sourceTab.getSd().getCols();
+    } else {
+      cols = null;
+    }
+    switch (compactionType) {
+    case MAJOR: {
+      if (sourcePartition != null) { //mmmajor and partitioned
+        appendColumns(query, cols, false);
+      } else { // mmmajor and unpartitioned
+        query.append("*");
+      }
+      break;
+    }
+    case MINOR: {
+      appendColumns(query, cols, false);
+    }
+    }
+  }
+
+  protected void getSourceForInsert(StringBuilder query) {
+    if (sourceTabForInsert != null) {
+      query.append(sourceTabForInsert);
+    } else {
+      
query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
+    }
+    query.append(" ");
+    if (CompactionType.MAJOR.equals(compactionType) && 
StringUtils.isNotBlank(orderByClause)) {
+      query.append(orderByClause);
+    }
+  }
+
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
&& sourceTab != null) {
+      List<String> vals = sourcePartition.getValues();
+      List<FieldSchema> keys = sourceTab.getPartitionKeys();
+      if (keys.size() != vals.size()) {
+        throw new IllegalStateException("source partition values (" + 
Arrays.toString(
+            vals.toArray()) + ") do not match source table values (" + 
Arrays.toString(
+            keys.toArray()) + "). Failing compaction.");
+      }
+
+      query.append(" where ");
+      for (int i = 0; i < keys.size(); ++i) {
+        FieldSchema keySchema = keys.get(i);
+        query.append(i == 0 ? "`" : " and 
`").append(keySchema.getName()).append("`=");
+        if 
(!keySchema.getType().equalsIgnoreCase(ColumnType.BOOLEAN_TYPE_NAME)) {
+          query.append("'").append(vals.get(i)).append("'");
+        } else {
+          query.append(vals.get(i));
+        }
+      }
+    }
+  }
+
+  protected void getDdlForCreate(StringBuilder query) {
+    defineColumns(query);
+
+    // PARTITIONED BY. Used for parts of minor compaction.
+    if (isPartitioned) {
+      query.append(" PARTITIONED BY (`file_name` STRING) ");
+    }
+
+    // CLUSTERED BY. (bucketing)
+    getMmBucketing(query);
+
+    // SKEWED BY
+    getSkewedByClause(query);
+
+    // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+    copySerdeFromSourceTable(query);
+
+    // LOCATION
+    if (location != null) {
+      query.append(" LOCATION 
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+    }
+
+    // TBLPROPERTIES
+    addTblProperties(query);
+  }
+
+  /**
+   * Define columns of the create query.
+   */
+  private void defineColumns(StringBuilder query) {
+    if (sourceTab == null) {
+      return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+    }
+    query.append("(");
+    List<String> columnDescs = getColumnDescs();
+    query.append(StringUtils.join(columnDescs, ','));
+    query.append(") ");
+  }
+
+  /**
+   * Part of Create operation. Copy source table bucketing for insert-only 
compaction.
+   */
+  private void getMmBucketing(StringBuilder query) {
+    if (sourceTab == null) {
+      return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+    }
+    boolean isFirst;
+    List<String> buckCols = sourceTab.getSd().getBucketCols();
+    if (buckCols.size() > 0) {

Review Comment:
   Change to    `` if (!buckCols.isEmpty()) {``



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only 
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+  private StorageDescriptor storageDescriptor; // for Create in insert-only
+
+  /**
+   * Construct a CompactionQueryBuilderForInsertOnly with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation      query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null or the 
compaction type is REBALANCE
+   */
+  CompactionQueryBuilderForInsertOnly(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, true, resultTableName);
+    if (CompactionType.REBALANCE.equals(compactionType)) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on full ACID tables!");
+    }
+  }
+
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    List<FieldSchema> cols;
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
|| CompactionType.MINOR.equals(
+        compactionType)) {
+      if (sourceTab == null) {
+        return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+      }
+      cols = sourceTab.getSd().getCols();
+    } else {
+      cols = null;
+    }
+    switch (compactionType) {
+    case MAJOR: {
+      if (sourcePartition != null) { //mmmajor and partitioned
+        appendColumns(query, cols, false);
+      } else { // mmmajor and unpartitioned
+        query.append("*");
+      }
+      break;
+    }
+    case MINOR: {
+      appendColumns(query, cols, false);
+    }
+    }
+  }
+
+  protected void getSourceForInsert(StringBuilder query) {
+    if (sourceTabForInsert != null) {
+      query.append(sourceTabForInsert);
+    } else {
+      
query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
+    }
+    query.append(" ");
+    if (CompactionType.MAJOR.equals(compactionType) && 
StringUtils.isNotBlank(orderByClause)) {
+      query.append(orderByClause);
+    }
+  }
+
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
&& sourceTab != null) {
+      List<String> vals = sourcePartition.getValues();
+      List<FieldSchema> keys = sourceTab.getPartitionKeys();
+      if (keys.size() != vals.size()) {
+        throw new IllegalStateException("source partition values (" + 
Arrays.toString(
+            vals.toArray()) + ") do not match source table values (" + 
Arrays.toString(
+            keys.toArray()) + "). Failing compaction.");
+      }
+
+      query.append(" where ");
+      for (int i = 0; i < keys.size(); ++i) {

Review Comment:
   keys.size() is being computed twice, here & above
   ```
    if (keys.size() != vals.size()) {
   ```
   can refactor into a variable and use at both places



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only 
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+  private StorageDescriptor storageDescriptor; // for Create in insert-only
+
+  /**
+   * Construct a CompactionQueryBuilderForInsertOnly with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation      query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null or the 
compaction type is REBALANCE
+   */
+  CompactionQueryBuilderForInsertOnly(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, true, resultTableName);
+    if (CompactionType.REBALANCE.equals(compactionType)) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on full ACID tables!");
+    }
+  }
+
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    List<FieldSchema> cols;
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
|| CompactionType.MINOR.equals(
+        compactionType)) {
+      if (sourceTab == null) {
+        return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+      }
+      cols = sourceTab.getSd().getCols();
+    } else {
+      cols = null;
+    }
+    switch (compactionType) {
+    case MAJOR: {
+      if (sourcePartition != null) { //mmmajor and partitioned
+        appendColumns(query, cols, false);
+      } else { // mmmajor and unpartitioned
+        query.append("*");
+      }
+      break;
+    }
+    case MINOR: {
+      appendColumns(query, cols, false);
+    }
+    }
+  }
+
+  protected void getSourceForInsert(StringBuilder query) {
+    if (sourceTabForInsert != null) {
+      query.append(sourceTabForInsert);
+    } else {
+      
query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
+    }
+    query.append(" ");
+    if (CompactionType.MAJOR.equals(compactionType) && 
StringUtils.isNotBlank(orderByClause)) {
+      query.append(orderByClause);
+    }
+  }
+
+  protected void buildWhereClauseForInsert(StringBuilder query) {

Review Comment:
   Missing override



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMinor.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Builds query strings that help with query-based MINOR compaction of CRUD.
+ */
+class CompactionQueryBuilderForMinor extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForMinor.class.getName());
+
+  /**
+   * Construct a CompactionQueryBuilderMinor with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null
+   */
+  CompactionQueryBuilderForMinor(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, false, resultTableName);
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    query.append("`operation`, `originalTransaction`, `bucket`, `rowId`, 
`currentTransaction`, `row`");
+  }
+
+  protected void getSourceForInsert(StringBuilder query) {
+    if (sourceTabForInsert != null) {
+      query.append(sourceTabForInsert);
+    } else {
+      
query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
+    }
+    query.append(" ");
+  }
+
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    if (validWriteIdList != null) {
+      if (validWriteIdList != null) {

Review Comment:
   Why it is there twice?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMinor.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Builds query strings that help with query-based MINOR compaction of CRUD.
+ */
+class CompactionQueryBuilderForMinor extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForMinor.class.getName());
+
+  /**
+   * Construct a CompactionQueryBuilderMinor with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null
+   */
+  CompactionQueryBuilderForMinor(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, false, resultTableName);
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {

Review Comment:
   missing Override annotation here & other, please add them for all other 
overriden methods as well



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only 
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+  private StorageDescriptor storageDescriptor; // for Create in insert-only
+
+  /**
+   * Construct a CompactionQueryBuilderForInsertOnly with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation      query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null or the 
compaction type is REBALANCE
+   */
+  CompactionQueryBuilderForInsertOnly(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, true, resultTableName);
+    if (CompactionType.REBALANCE.equals(compactionType)) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on full ACID tables!");
+    }
+  }
+
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    List<FieldSchema> cols;
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
|| CompactionType.MINOR.equals(
+        compactionType)) {
+      if (sourceTab == null) {
+        return; // avoid NPEs, don't throw an exception but skip this part of 
the query
+      }
+      cols = sourceTab.getSd().getCols();
+    } else {
+      cols = null;
+    }
+    switch (compactionType) {
+    case MAJOR: {
+      if (sourcePartition != null) { //mmmajor and partitioned
+        appendColumns(query, cols, false);
+      } else { // mmmajor and unpartitioned
+        query.append("*");
+      }
+      break;
+    }
+    case MINOR: {
+      appendColumns(query, cols, false);
+    }
+    }
+  }
+
+  protected void getSourceForInsert(StringBuilder query) {
+    if (sourceTabForInsert != null) {
+      query.append(sourceTabForInsert);
+    } else {
+      
query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
+    }
+    query.append(" ");
+    if (CompactionType.MAJOR.equals(compactionType) && 
StringUtils.isNotBlank(orderByClause)) {
+      query.append(orderByClause);
+    }
+  }
+
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
&& sourceTab != null) {
+      List<String> vals = sourcePartition.getValues();
+      List<FieldSchema> keys = sourceTab.getPartitionKeys();
+      if (keys.size() != vals.size()) {
+        throw new IllegalStateException("source partition values (" + 
Arrays.toString(
+            vals.toArray()) + ") do not match source table values (" + 
Arrays.toString(
+            keys.toArray()) + "). Failing compaction.");
+      }
+
+      query.append(" where ");
+      for (int i = 0; i < keys.size(); ++i) {
+        FieldSchema keySchema = keys.get(i);
+        query.append(i == 0 ? "`" : " and 
`").append(keySchema.getName()).append("`=");
+        if 
(!keySchema.getType().equalsIgnoreCase(ColumnType.BOOLEAN_TYPE_NAME)) {
+          query.append("'").append(vals.get(i)).append("'");
+        } else {
+          query.append(vals.get(i));
+        }
+      }
+    }
+  }
+
+  protected void getDdlForCreate(StringBuilder query) {

Review Comment:
   add override annotation



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMajor.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Builds query strings that help with query-based MAJOR compaction of CRUD.
+ */
+class CompactionQueryBuilderForMajor extends CompactionQueryBuilder {

Review Comment:
   +1



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMinor.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Builds query strings that help with query-based MINOR compaction of CRUD.
+ */
+class CompactionQueryBuilderForMinor extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForMinor.class.getName());
+
+  /**
+   * Construct a CompactionQueryBuilderMinor with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   * @param operation query's Operation e.g. Operation.CREATE.
+   * @throws IllegalArgumentException if compactionType is null
+   */
+  CompactionQueryBuilderForMinor(CompactionType compactionType, Operation 
operation, String resultTableName) {
+    super(compactionType, operation, false, resultTableName);
+  }
+
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    query.append("`operation`, `originalTransaction`, `bucket`, `rowId`, 
`currentTransaction`, `row`");
+  }
+
+  protected void getSourceForInsert(StringBuilder query) {
+    if (sourceTabForInsert != null) {
+      query.append(sourceTabForInsert);
+    } else {
+      
query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
+    }
+    query.append(" ");
+  }
+
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    if (validWriteIdList != null) {
+      if (validWriteIdList != null) {
+        long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds();
+        if (invalidWriteIds.length > 0) {
+          query.append(" where `originalTransaction` not in (")
+              .append(StringUtils.join(ArrayUtils.toObject(invalidWriteIds), 
",")).append(")");
+        }
+      }
+    }
+  }
+
+  protected void getDdlForCreate(StringBuilder query) {
+    defineColumns(query);
+
+    // PARTITIONED BY. Used for parts of minor compaction.
+    if (isPartitioned) {
+      query.append(" PARTITIONED BY (`file_name` STRING) ");
+    }
+
+    // CLUSTERED BY. (bucketing)
+    int bucketingVersion = 0;
+    if (CompactionType.MINOR.equals(compactionType)) {
+      bucketingVersion = getMinorCrudBucketing(query, bucketingVersion);
+    }
+
+    // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+    query.append(" stored as orc");
+
+    // LOCATION
+    if (location != null) {
+      query.append(" LOCATION 
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+    }
+
+    // TBLPROPERTIES
+    addTblProperties(query, bucketingVersion);
+  }
+
+  /**
+   * Define columns of the create query.
+   */
+  private void defineColumns(StringBuilder query) {
+    if (sourceTab == null) {

Review Comment:
   Why are doing this null & return here and even other places can't we wrap 
our logic within if(sourceTab!=null) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to