VenuReddy2103 commented on code in PR #3905:
URL: https://github.com/apache/hive/pull/3905#discussion_r1071755797


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -515,6 +529,803 @@ public List<String> 
getMaterializedViewsForRewriting(String dbName) throws MetaE
     }
   }
 
+  private Long getDataStoreId(Class<?> modelClass) throws MetaException {
+    ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext();
+    AbstractClassMetaData cmd = 
ec.getMetaDataManager().getMetaDataForClass(modelClass, 
ec.getClassLoaderResolver());
+    if (cmd.getIdentityType() == IdentityType.DATASTORE) {
+      return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, 
cmd, -1);
+    } else {
+      throw new MetaException("Identity type is not datastore.");
+    }
+  }
+
+  /**
+   * Interface to execute multiple row insert query in batch for direct SQL
+   */
+  interface BatchExecutionContext {
+    void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException;
+  }
+
+  private void insertInBatch(String tableName, String columns, int 
columnCount, String rowFormat, int rowCount,
+      BatchExecutionContext bec) throws MetaException {
+    if (rowCount == 0 || columnCount == 0) {
+      return;
+    }
+    int maxParamsCount = maxParamsInInsert;
+    if (maxParamsCount < columnCount) {
+      LOG.error("Maximum number of parameters in the direct SQL batch insert 
query is less than the table: {}"
+          + " columns. Executing single row insert queries.", tableName);
+      maxParamsCount = columnCount;
+    }
+    int maxRowsInBatch = maxParamsCount / columnCount;
+    int maxBatches = rowCount / maxRowsInBatch;
+    int last = rowCount % maxRowsInBatch;
+    String query = "";
+    if (maxBatches > 0) {
+      query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, 
maxRowsInBatch);
+    }
+    int batchParamCount = maxRowsInBatch * columnCount;
+    for (int batch = 0; batch < maxBatches; batch++) {
+      bec.execute(query, maxRowsInBatch, batchParamCount);
+    }
+    if (last != 0) {
+      query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, last);
+      bec.execute(query, last, last * columnCount);
+    }
+  }
+
+  private void insertSerdeInBatch(Map<Long, MSerDeInfo> serdeIdToSerDeInfo) 
throws MetaException {
+    int rowCount = serdeIdToSerDeInfo.size();
+    String columns = 
"(\"SERDE_ID\",\"DESCRIPTION\",\"DESERIALIZER_CLASS\",\"NAME\",\"SERDE_TYPE\",\"SLIB\","
+        + "\"SERIALIZER_CLASS\")";
+    String row = "(?,?,?,?,?,?,?)";
+    int columnCount = 7;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MSerDeInfo>> it = 
serdeIdToSerDeInfo.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MSerDeInfo> entry = it.next();
+          MSerDeInfo serdeInfo = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = serdeInfo.getDescription();
+          params[paramIndex++] = serdeInfo.getDeserializerClass();
+          params[paramIndex++] = serdeInfo.getName();
+          params[paramIndex++] = serdeInfo.getSerdeType();
+          params[paramIndex++] = serdeInfo.getSerializationLib();
+          params[paramIndex++] = serdeInfo.getSerializerClass();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SERDES, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertStorageDescriptorInBatch(Map<Long, MStorageDescriptor> 
sdIdToStorageDescriptor,
+      Map<Long, Long> sdIdToSerdeId, Map<Long, Long> sdIdToCdId) throws 
MetaException {
+    int rowCount = sdIdToStorageDescriptor.size();
+    String columns = 
"(\"SD_ID\",\"CD_ID\",\"INPUT_FORMAT\",\"IS_COMPRESSED\",\"IS_STOREDASSUBDIRECTORIES\","
+        + "\"LOCATION\",\"NUM_BUCKETS\",\"OUTPUT_FORMAT\",\"SERDE_ID\")";
+    String row = "(?,?,?,?,?,?,?,?,?)";
+    int columnCount = 9;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> it = 
sdIdToStorageDescriptor.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MStorageDescriptor> entry = it.next();
+          MStorageDescriptor sd = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = sdIdToCdId.get(entry.getKey());
+          params[paramIndex++] = sd.getInputFormat();
+          params[paramIndex++] = dbType.getBoolean(sd.isCompressed());
+          params[paramIndex++] = 
dbType.getBoolean(sd.isStoredAsSubDirectories());
+          params[paramIndex++] = sd.getLocation();
+          params[paramIndex++] = sd.getNumBuckets();
+          params[paramIndex++] = sd.getOutputFormat();
+          params[paramIndex++] = sdIdToSerdeId.get(entry.getKey());
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SDS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertPartitionInBatch(Map<Long, MPartition> partIdToPartition, 
Map<Long, Long> partIdToSdId)
+      throws MetaException {
+    int rowCount = partIdToPartition.size();
+    String columns = 
"(\"PART_ID\",\"CREATE_TIME\",\"LAST_ACCESS_TIME\",\"PART_NAME\",\"SD_ID\",\"TBL_ID\","
+        + "\"WRITE_ID\")";
+    String row = "(?,?,?,?,?,?,?)";
+    int columnCount = 7;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MPartition>> it = 
partIdToPartition.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MPartition> entry = it.next();
+          MPartition partition = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = partition.getCreateTime();
+          params[paramIndex++] = partition.getLastAccessTime();
+          params[paramIndex++] = partition.getPartitionName();
+          params[paramIndex++] = partIdToSdId.get(entry.getKey());
+          params[paramIndex++] = partition.getTable().getId();
+          params[paramIndex++] = partition.getWriteId();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(PARTITIONS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertSerdeParamInBatch(Map<Long, MSerDeInfo> 
serdeIdToSerDeInfo) throws MetaException {
+    int rowCount = 0;
+    for (MSerDeInfo serDeInfo : serdeIdToSerDeInfo.values()) {
+      rowCount += serDeInfo.getParameters().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SERDE_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MSerDeInfo>> serdeIt = 
serdeIdToSerDeInfo.entrySet().iterator();
+      Map.Entry<Long, MSerDeInfo> serdeEntry = serdeIt.next();
+      Iterator<Map.Entry<String, String>> it = 
serdeEntry.getValue().getParameters().entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            Map.Entry<String, String> entry = it.next();
+            params[paramIndex++] = serdeEntry.getKey();
+            params[paramIndex++] = entry.getKey();
+            params[paramIndex++] = entry.getValue();
+            index++;
+          }
+          if (index < batchRowCount) {
+            serdeEntry = serdeIt.next();
+            it = serdeEntry.getValue().getParameters().entrySet().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SERDE_PARAMS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertStorageDescriptorParamInBatch(Map<Long, 
MStorageDescriptor> sdIdToStorageDescriptor)
+      throws MetaException {
+    int rowCount = 0;
+    for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+      rowCount += sd.getParameters().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SD_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = 
sdIdToStorageDescriptor.entrySet().iterator();
+      Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+      Iterator<Map.Entry<String, String>> it = 
sdEntry.getValue().getParameters().entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            Map.Entry<String, String> entry = it.next();
+            params[paramIndex++] = sdEntry.getKey();
+            params[paramIndex++] = entry.getKey();
+            params[paramIndex++] = entry.getValue();
+            index++;
+          }
+          if (index < batchRowCount) {
+            sdEntry = sdIt.next();
+            it = sdEntry.getValue().getParameters().entrySet().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SD_PARAMS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertPartitionParamInBatch(Map<Long, MPartition> 
partIdToPartition) throws MetaException {
+    int rowCount = 0;
+    for (MPartition part : partIdToPartition.values()) {
+      rowCount += part.getParameters().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"PART_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MPartition>> partIt = 
partIdToPartition.entrySet().iterator();
+      Map.Entry<Long, MPartition> partEntry = partIt.next();
+      Iterator<Map.Entry<String, String>> it = 
partEntry.getValue().getParameters().entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            Map.Entry<String, String> entry = it.next();
+            params[paramIndex++] = partEntry.getKey();
+            params[paramIndex++] = entry.getKey();
+            params[paramIndex++] = entry.getValue();
+            index++;
+          }
+          if (index < batchRowCount) {
+            partEntry = partIt.next();
+            it = partEntry.getValue().getParameters().entrySet().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(PARTITION_PARAMS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertPartitionKeyValInBatch(Map<Long, MPartition> 
partIdToPartition) throws MetaException {
+    int rowCount = 0;
+    for (MPartition part : partIdToPartition.values()) {
+      rowCount += part.getValues().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"PART_ID\",\"PART_KEY_VAL\",\"INTEGER_IDX\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, MPartition>> partIt = 
partIdToPartition.entrySet().iterator();
+      Map.Entry<Long, MPartition> partEntry = partIt.next();
+      Iterator<String> it = partEntry.getValue().getValues().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            params[paramIndex++] = partEntry.getKey();
+            params[paramIndex++] = it.next();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            partEntry = partIt.next();
+            it = partEntry.getValue().getValues().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(PARTITION_KEY_VALS, columns, columnCount, row, rowCount, 
bec);
+  }
+
+
+  private void insertColumnDescriptorInBatch(Map<Long, MColumnDescriptor> 
cdIdToColumnDescriptor) throws MetaException {
+    int rowCount = cdIdToColumnDescriptor.size();
+    String columns = "(\"CD_ID\")";
+    String row = "(?)";
+    int columnCount = 1;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Long> it = cdIdToColumnDescriptor.keySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          params[paramIndex++] = it.next();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(CDS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertColumnV2InBatch(Map<Long, MColumnDescriptor> 
cdIdToColumnDescriptor) throws MetaException {
+    int rowCount = 0;
+    for (MColumnDescriptor cd : cdIdToColumnDescriptor.values()) {
+      rowCount += cd.getCols().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = 
"(\"CD_ID\",\"COMMENT\",\"COLUMN_NAME\",\"TYPE_NAME\",\"INTEGER_IDX\")";
+    String row = "(?,?,?,?,?)";
+    int columnCount = 5;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, MColumnDescriptor>> cdIt = 
cdIdToColumnDescriptor.entrySet().iterator();
+      Map.Entry<Long, MColumnDescriptor> cdEntry = cdIt.next();
+      Iterator<MFieldSchema> it = cdEntry.getValue().getCols().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            MFieldSchema fieldSchema = it.next();
+            params[paramIndex++] = cdEntry.getKey();
+            params[paramIndex++] = fieldSchema.getComment();
+            params[paramIndex++] = fieldSchema.getName();
+            params[paramIndex++] = fieldSchema.getType();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            cdEntry = cdIt.next();
+            it = cdEntry.getValue().getCols().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(COLUMNS_V2, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertBucketColInBatch(Map<Long, MStorageDescriptor> 
sdIdToStorageDescriptor) throws MetaException {
+    int rowCount = 0;
+    for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+      rowCount += sd.getBucketCols().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SD_ID\",\"BUCKET_COL_NAME\",\"INTEGER_IDX\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = 
sdIdToStorageDescriptor.entrySet().iterator();
+      Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+      Iterator<String> it = sdEntry.getValue().getBucketCols().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            params[paramIndex++] = sdEntry.getKey();
+            params[paramIndex++] = it.next();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            sdEntry = sdIt.next();
+            it = sdEntry.getValue().getBucketCols().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(BUCKETING_COLS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertSortColInBatch(Map<Long, MStorageDescriptor> 
sdIdToStorageDescriptor) throws MetaException {
+    int rowCount = 0;
+    for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+      rowCount += sd.getSortCols().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SD_ID\",\"COLUMN_NAME\",\"ORDER\",\"INTEGER_IDX\")";
+    String row = "(?,?,?,?)";
+    int columnCount = 4;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = 
sdIdToStorageDescriptor.entrySet().iterator();
+      Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+      Iterator<MOrder> it = sdEntry.getValue().getSortCols().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            MOrder order = it.next();
+            params[paramIndex++] = sdEntry.getKey();
+            params[paramIndex++] = order.getCol();
+            params[paramIndex++] = order.getOrder();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            sdEntry = sdIt.next();
+            it = sdEntry.getValue().getSortCols().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SORT_COLS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertSkewedStringListInBatch(List<Long> stringListIds) throws 
MetaException {
+    int rowCount = stringListIds.size();
+    String columns = "(\"STRING_LIST_ID\")";
+    String row = "(?)";
+    int columnCount = 1;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Long> it = stringListIds.iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          params[paramIndex++] = it.next();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SKEWED_STRING_LIST, columns, columnCount, row, rowCount, 
bec);
+  }
+
+  private void insertSkewedStringListValInBatch(Map<Long, List<String>> 
stringListIdToStringList) throws MetaException {
+    int rowCount = 0;
+    for (List<String> stringList : stringListIdToStringList.values()) {
+      rowCount += stringList.size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = 
"(\"STRING_LIST_ID\",\"STRING_LIST_VALUE\",\"INTEGER_IDX\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, List<String>>> stringListIt = 
stringListIdToStringList.entrySet().iterator();
+      Map.Entry<Long, List<String>> stringListEntry = stringListIt.next();
+      Iterator<String> it = stringListEntry.getValue().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            params[paramIndex++] = stringListEntry.getKey();
+            params[paramIndex++] = it.next();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            stringListEntry = stringListIt.next();
+            it = stringListEntry.getValue().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SKEWED_STRING_LIST_VALUES, columns, columnCount, row, 
rowCount, bec);
+  }
+
+  private void insertSkewedColInBatch(Map<Long, MStorageDescriptor> 
sdIdToStorageDescriptor) throws MetaException {
+    int rowCount = 0;
+    for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+      rowCount += sd.getSkewedColNames().size();
+    }
+    if (rowCount == 0) {
+      return;
+    }
+    String columns = "(\"SD_ID\",\"SKEWED_COL_NAME\",\"INTEGER_IDX\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = 
sdIdToStorageDescriptor.entrySet().iterator();
+      Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+      Iterator<String> it = sdEntry.getValue().getSkewedColNames().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int index = 0;
+        int paramIndex = 0;
+        do {
+          while (index < batchRowCount && it.hasNext()) {
+            params[paramIndex++] = sdEntry.getKey();
+            params[paramIndex++] = it.next();
+            params[paramIndex++] = colIndex++;
+            index++;
+          }
+          if (index < batchRowCount) {
+            colIndex = 0;
+            sdEntry = sdIt.next();
+            it = sdEntry.getValue().getSkewedColNames().iterator();
+          }
+        } while (index < batchRowCount);
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SKEWED_COL_NAMES, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertSkewedValInBatch(List<Long> stringListIds, Map<Long, 
Long> stringListIdToSdId)
+      throws MetaException {
+    int rowCount = stringListIds.size();
+    String columns = "(\"SD_ID_OID\",\"STRING_LIST_ID_EID\",\"INTEGER_IDX\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      int colIndex = 0;
+      long prevSdId = -1;
+      final Iterator<Long> it = stringListIds.iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Long stringListId = it.next();
+          Long sdId = stringListIdToSdId.get(stringListId);
+          params[paramIndex++] = sdId;
+          params[paramIndex++] = stringListId;
+          if (prevSdId != sdId) {
+            colIndex = 0;
+          }
+          params[paramIndex++] = colIndex++;
+          prevSdId = sdId;
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SKEWED_VALUES, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertSkewedLocationInBatch(Map<Long, String> 
stringListIdToLocation, Map<Long, Long> stringListIdToSdId)
+      throws MetaException {
+    int rowCount = stringListIdToLocation.size();
+    String columns = "(\"SD_ID\",\"STRING_LIST_ID_KID\",\"LOCATION\")";
+    String row = "(?,?,?)";
+    int columnCount = 3;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, String>> it = 
stringListIdToLocation.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, String> entry = it.next();
+          params[paramIndex++] = stringListIdToSdId.get(entry.getKey());
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = entry.getValue();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(SKEWED_COL_VALUE_LOC_MAP, columns, columnCount, row, 
rowCount, bec);
+  }
+
+  private void insertPartitionPrivilegeInBatch(Map<Long, MPartitionPrivilege> 
partGrantIdToPrivilege,
+      Map<Long, Long> partGrantIdToPartId) throws MetaException {
+    int rowCount = partGrantIdToPrivilege.size();
+    String columns = 
"(\"PART_GRANT_ID\",\"AUTHORIZER\",\"CREATE_TIME\",\"GRANT_OPTION\",\"GRANTOR\",\"GRANTOR_TYPE\","
+        + "\"PART_ID\",\"PRINCIPAL_NAME\",\"PRINCIPAL_TYPE\",\"PART_PRIV\")";
+    String row = "(?,?,?,?,?,?,?,?,?,?)";
+    int columnCount = 10;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MPartitionPrivilege>> it = 
partGrantIdToPrivilege.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MPartitionPrivilege> entry = it.next();
+          MPartitionPrivilege partPrivilege = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = partPrivilege.getAuthorizer();
+          params[paramIndex++] = partPrivilege.getCreateTime();
+          params[paramIndex++] = partPrivilege.getGrantOption();
+          params[paramIndex++] = partPrivilege.getGrantor();
+          params[paramIndex++] = partPrivilege.getGrantorType();
+          params[paramIndex++] = partGrantIdToPartId.get(entry.getKey());
+          params[paramIndex++] = partPrivilege.getPrincipalName();
+          params[paramIndex++] = partPrivilege.getPrincipalType();
+          params[paramIndex++] = partPrivilege.getPrivilege();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(PART_PRIVS, columns, columnCount, row, rowCount, bec);
+  }
+
+  private void insertPartitionColPrivilegeInBatch(Map<Long, 
MPartitionColumnPrivilege> partColumnGrantIdToPrivilege,
+      Map<Long, Long> partColumnGrantIdToPartId) throws MetaException {
+    int rowCount = partColumnGrantIdToPrivilege.size();
+    String columns = 
"(\"PART_COLUMN_GRANT_ID\",\"AUTHORIZER\",\"COLUMN_NAME\",\"CREATE_TIME\",\"GRANT_OPTION\","
+        + 
"\"GRANTOR\",\"GRANTOR_TYPE\",\"PART_ID\",\"PRINCIPAL_NAME\",\"PRINCIPAL_TYPE\",\"PART_COL_PRIV\")";
+    String row = "(?,?,?,?,?,?,?,?,?,?,?)";
+    int columnCount = 11;
+    BatchExecutionContext bec = new BatchExecutionContext() {
+      final Iterator<Map.Entry<Long, MPartitionColumnPrivilege>> it
+          = partColumnGrantIdToPrivilege.entrySet().iterator();
+      @Override
+      public void execute(String batchQueryText, int batchRowCount, int 
batchParamCount) throws MetaException {
+        Object[] params = new Object[batchParamCount];
+        int paramIndex = 0;
+        for (int index = 0; index < batchRowCount; index++) {
+          Map.Entry<Long, MPartitionColumnPrivilege> entry = it.next();
+          MPartitionColumnPrivilege partColumnPrivilege = entry.getValue();
+          params[paramIndex++] = entry.getKey();
+          params[paramIndex++] = partColumnPrivilege.getAuthorizer();
+          params[paramIndex++] = partColumnPrivilege.getColumnName();
+          params[paramIndex++] = partColumnPrivilege.getCreateTime();
+          params[paramIndex++] = partColumnPrivilege.getGrantOption();
+          params[paramIndex++] = partColumnPrivilege.getGrantor();
+          params[paramIndex++] = partColumnPrivilege.getGrantorType();
+          params[paramIndex++] = partColumnGrantIdToPartId.get(entry.getKey());
+          params[paramIndex++] = partColumnPrivilege.getPrincipalName();
+          params[paramIndex++] = partColumnPrivilege.getPrincipalType();
+          params[paramIndex++] = partColumnPrivilege.getPrivilege();
+        }
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+          executeWithArray(query.getInnerQuery(), params, batchQueryText);
+        }
+      }
+    };
+    insertInBatch(PART_COL_PRIVS, columns, columnCount, row, rowCount, bec);
+  }
+
+  /**
+   * Add partitions in batch using direct SQL
+   * @param parts list of partitions
+   * @param partPrivilegesList list of partition privileges
+   * @param partColPrivilegesList list of partition column privileges
+   * @throws MetaException
+   */
+  public void addPartitions(List<MPartition> parts, 
List<List<MPartitionPrivilege>> partPrivilegesList,

Review Comment:
   agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to