[
https://issues.apache.org/jira/browse/HIVE-26035?focusedWorklogId=838276&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-838276
]
ASF GitHub Bot logged work on HIVE-26035:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Jan/23 10:00
Start Date: 10/Jan/23 10:00
Worklog Time Spent: 10m
Work Description: dengzhhu653 commented on code in PR #3905:
URL: https://github.com/apache/hive/pull/3905#discussion_r1065570458
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -515,6 +529,803 @@ public List<String>
getMaterializedViewsForRewriting(String dbName) throws MetaE
}
}
+ private Long getDataStoreId(Class<?> modelClass) throws MetaException {
+ ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext();
+ AbstractClassMetaData cmd =
ec.getMetaDataManager().getMetaDataForClass(modelClass,
ec.getClassLoaderResolver());
+ if (cmd.getIdentityType() == IdentityType.DATASTORE) {
+ return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec,
cmd, -1);
+ } else {
+ throw new MetaException("Identity type is not datastore.");
+ }
+ }
+
+ /**
+ * Interface to execute multiple row insert query in batch for direct SQL
+ */
+ interface BatchExecutionContext {
+ void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException;
+ }
+
+ private void insertInBatch(String tableName, String columns, int
columnCount, String rowFormat, int rowCount,
+ BatchExecutionContext bec) throws MetaException {
+ if (rowCount == 0 || columnCount == 0) {
+ return;
+ }
+ int maxParamsCount = maxParamsInInsert;
+ if (maxParamsCount < columnCount) {
+ LOG.error("Maximum number of parameters in the direct SQL batch insert
query is less than the table: {}"
+ + " columns. Executing single row insert queries.", tableName);
+ maxParamsCount = columnCount;
+ }
+ int maxRowsInBatch = maxParamsCount / columnCount;
+ int maxBatches = rowCount / maxRowsInBatch;
+ int last = rowCount % maxRowsInBatch;
+ String query = "";
+ if (maxBatches > 0) {
+ query = dbType.getBatchInsertQuery(tableName, columns, rowFormat,
maxRowsInBatch);
+ }
+ int batchParamCount = maxRowsInBatch * columnCount;
+ for (int batch = 0; batch < maxBatches; batch++) {
+ bec.execute(query, maxRowsInBatch, batchParamCount);
+ }
+ if (last != 0) {
+ query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, last);
+ bec.execute(query, last, last * columnCount);
+ }
+ }
+
+ private void insertSerdeInBatch(Map<Long, MSerDeInfo> serdeIdToSerDeInfo)
throws MetaException {
+ int rowCount = serdeIdToSerDeInfo.size();
+ String columns =
"(\"SERDE_ID\",\"DESCRIPTION\",\"DESERIALIZER_CLASS\",\"NAME\",\"SERDE_TYPE\",\"SLIB\","
+ + "\"SERIALIZER_CLASS\")";
+ String row = "(?,?,?,?,?,?,?)";
+ int columnCount = 7;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Map.Entry<Long, MSerDeInfo>> it =
serdeIdToSerDeInfo.entrySet().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int paramIndex = 0;
+ for (int index = 0; index < batchRowCount; index++) {
+ Map.Entry<Long, MSerDeInfo> entry = it.next();
+ MSerDeInfo serdeInfo = entry.getValue();
+ params[paramIndex++] = entry.getKey();
+ params[paramIndex++] = serdeInfo.getDescription();
+ params[paramIndex++] = serdeInfo.getDeserializerClass();
+ params[paramIndex++] = serdeInfo.getName();
+ params[paramIndex++] = serdeInfo.getSerdeType();
+ params[paramIndex++] = serdeInfo.getSerializationLib();
+ params[paramIndex++] = serdeInfo.getSerializerClass();
+ }
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(SERDES, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertStorageDescriptorInBatch(Map<Long, MStorageDescriptor>
sdIdToStorageDescriptor,
+ Map<Long, Long> sdIdToSerdeId, Map<Long, Long> sdIdToCdId) throws
MetaException {
+ int rowCount = sdIdToStorageDescriptor.size();
+ String columns =
"(\"SD_ID\",\"CD_ID\",\"INPUT_FORMAT\",\"IS_COMPRESSED\",\"IS_STOREDASSUBDIRECTORIES\","
+ + "\"LOCATION\",\"NUM_BUCKETS\",\"OUTPUT_FORMAT\",\"SERDE_ID\")";
+ String row = "(?,?,?,?,?,?,?,?,?)";
+ int columnCount = 9;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Map.Entry<Long, MStorageDescriptor>> it =
sdIdToStorageDescriptor.entrySet().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int paramIndex = 0;
+ for (int index = 0; index < batchRowCount; index++) {
+ Map.Entry<Long, MStorageDescriptor> entry = it.next();
+ MStorageDescriptor sd = entry.getValue();
+ params[paramIndex++] = entry.getKey();
+ params[paramIndex++] = sdIdToCdId.get(entry.getKey());
+ params[paramIndex++] = sd.getInputFormat();
+ params[paramIndex++] = dbType.getBoolean(sd.isCompressed());
+ params[paramIndex++] =
dbType.getBoolean(sd.isStoredAsSubDirectories());
+ params[paramIndex++] = sd.getLocation();
+ params[paramIndex++] = sd.getNumBuckets();
+ params[paramIndex++] = sd.getOutputFormat();
+ params[paramIndex++] = sdIdToSerdeId.get(entry.getKey());
+ }
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(SDS, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertPartitionInBatch(Map<Long, MPartition> partIdToPartition,
Map<Long, Long> partIdToSdId)
+ throws MetaException {
+ int rowCount = partIdToPartition.size();
+ String columns =
"(\"PART_ID\",\"CREATE_TIME\",\"LAST_ACCESS_TIME\",\"PART_NAME\",\"SD_ID\",\"TBL_ID\","
+ + "\"WRITE_ID\")";
+ String row = "(?,?,?,?,?,?,?)";
+ int columnCount = 7;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Map.Entry<Long, MPartition>> it =
partIdToPartition.entrySet().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int paramIndex = 0;
+ for (int index = 0; index < batchRowCount; index++) {
+ Map.Entry<Long, MPartition> entry = it.next();
+ MPartition partition = entry.getValue();
+ params[paramIndex++] = entry.getKey();
+ params[paramIndex++] = partition.getCreateTime();
+ params[paramIndex++] = partition.getLastAccessTime();
+ params[paramIndex++] = partition.getPartitionName();
+ params[paramIndex++] = partIdToSdId.get(entry.getKey());
+ params[paramIndex++] = partition.getTable().getId();
+ params[paramIndex++] = partition.getWriteId();
+ }
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(PARTITIONS, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertSerdeParamInBatch(Map<Long, MSerDeInfo>
serdeIdToSerDeInfo) throws MetaException {
+ int rowCount = 0;
+ for (MSerDeInfo serDeInfo : serdeIdToSerDeInfo.values()) {
+ rowCount += serDeInfo.getParameters().size();
+ }
+ if (rowCount == 0) {
+ return;
+ }
+ String columns = "(\"SERDE_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")";
+ String row = "(?,?,?)";
+ int columnCount = 3;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Map.Entry<Long, MSerDeInfo>> serdeIt =
serdeIdToSerDeInfo.entrySet().iterator();
+ Map.Entry<Long, MSerDeInfo> serdeEntry = serdeIt.next();
+ Iterator<Map.Entry<String, String>> it =
serdeEntry.getValue().getParameters().entrySet().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int index = 0;
+ int paramIndex = 0;
+ do {
+ while (index < batchRowCount && it.hasNext()) {
+ Map.Entry<String, String> entry = it.next();
+ params[paramIndex++] = serdeEntry.getKey();
+ params[paramIndex++] = entry.getKey();
+ params[paramIndex++] = entry.getValue();
+ index++;
+ }
+ if (index < batchRowCount) {
+ serdeEntry = serdeIt.next();
+ it = serdeEntry.getValue().getParameters().entrySet().iterator();
+ }
+ } while (index < batchRowCount);
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(SERDE_PARAMS, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertStorageDescriptorParamInBatch(Map<Long,
MStorageDescriptor> sdIdToStorageDescriptor)
+ throws MetaException {
+ int rowCount = 0;
+ for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+ rowCount += sd.getParameters().size();
+ }
+ if (rowCount == 0) {
+ return;
+ }
+ String columns = "(\"SD_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")";
+ String row = "(?,?,?)";
+ int columnCount = 3;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt =
sdIdToStorageDescriptor.entrySet().iterator();
+ Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+ Iterator<Map.Entry<String, String>> it =
sdEntry.getValue().getParameters().entrySet().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int index = 0;
+ int paramIndex = 0;
+ do {
+ while (index < batchRowCount && it.hasNext()) {
+ Map.Entry<String, String> entry = it.next();
+ params[paramIndex++] = sdEntry.getKey();
+ params[paramIndex++] = entry.getKey();
+ params[paramIndex++] = entry.getValue();
+ index++;
+ }
+ if (index < batchRowCount) {
+ sdEntry = sdIt.next();
+ it = sdEntry.getValue().getParameters().entrySet().iterator();
+ }
+ } while (index < batchRowCount);
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(SD_PARAMS, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertPartitionParamInBatch(Map<Long, MPartition>
partIdToPartition) throws MetaException {
+ int rowCount = 0;
+ for (MPartition part : partIdToPartition.values()) {
+ rowCount += part.getParameters().size();
+ }
+ if (rowCount == 0) {
+ return;
+ }
+ String columns = "(\"PART_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")";
+ String row = "(?,?,?)";
+ int columnCount = 3;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Map.Entry<Long, MPartition>> partIt =
partIdToPartition.entrySet().iterator();
+ Map.Entry<Long, MPartition> partEntry = partIt.next();
+ Iterator<Map.Entry<String, String>> it =
partEntry.getValue().getParameters().entrySet().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int index = 0;
+ int paramIndex = 0;
+ do {
+ while (index < batchRowCount && it.hasNext()) {
+ Map.Entry<String, String> entry = it.next();
+ params[paramIndex++] = partEntry.getKey();
+ params[paramIndex++] = entry.getKey();
+ params[paramIndex++] = entry.getValue();
+ index++;
+ }
+ if (index < batchRowCount) {
+ partEntry = partIt.next();
+ it = partEntry.getValue().getParameters().entrySet().iterator();
+ }
+ } while (index < batchRowCount);
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(PARTITION_PARAMS, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertPartitionKeyValInBatch(Map<Long, MPartition>
partIdToPartition) throws MetaException {
+ int rowCount = 0;
+ for (MPartition part : partIdToPartition.values()) {
+ rowCount += part.getValues().size();
+ }
+ if (rowCount == 0) {
+ return;
+ }
+ String columns = "(\"PART_ID\",\"PART_KEY_VAL\",\"INTEGER_IDX\")";
+ String row = "(?,?,?)";
+ int columnCount = 3;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ int colIndex = 0;
+ final Iterator<Map.Entry<Long, MPartition>> partIt =
partIdToPartition.entrySet().iterator();
+ Map.Entry<Long, MPartition> partEntry = partIt.next();
+ Iterator<String> it = partEntry.getValue().getValues().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int index = 0;
+ int paramIndex = 0;
+ do {
+ while (index < batchRowCount && it.hasNext()) {
+ params[paramIndex++] = partEntry.getKey();
+ params[paramIndex++] = it.next();
+ params[paramIndex++] = colIndex++;
+ index++;
+ }
+ if (index < batchRowCount) {
+ colIndex = 0;
+ partEntry = partIt.next();
+ it = partEntry.getValue().getValues().iterator();
+ }
+ } while (index < batchRowCount);
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(PARTITION_KEY_VALS, columns, columnCount, row, rowCount,
bec);
+ }
+
+
+ private void insertColumnDescriptorInBatch(Map<Long, MColumnDescriptor>
cdIdToColumnDescriptor) throws MetaException {
+ int rowCount = cdIdToColumnDescriptor.size();
+ String columns = "(\"CD_ID\")";
+ String row = "(?)";
+ int columnCount = 1;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Long> it = cdIdToColumnDescriptor.keySet().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int paramIndex = 0;
+ for (int index = 0; index < batchRowCount; index++) {
+ params[paramIndex++] = it.next();
+ }
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(CDS, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertColumnV2InBatch(Map<Long, MColumnDescriptor>
cdIdToColumnDescriptor) throws MetaException {
+ int rowCount = 0;
+ for (MColumnDescriptor cd : cdIdToColumnDescriptor.values()) {
+ rowCount += cd.getCols().size();
+ }
+ if (rowCount == 0) {
+ return;
+ }
+ String columns =
"(\"CD_ID\",\"COMMENT\",\"COLUMN_NAME\",\"TYPE_NAME\",\"INTEGER_IDX\")";
+ String row = "(?,?,?,?,?)";
+ int columnCount = 5;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ int colIndex = 0;
+ final Iterator<Map.Entry<Long, MColumnDescriptor>> cdIt =
cdIdToColumnDescriptor.entrySet().iterator();
+ Map.Entry<Long, MColumnDescriptor> cdEntry = cdIt.next();
+ Iterator<MFieldSchema> it = cdEntry.getValue().getCols().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int index = 0;
+ int paramIndex = 0;
+ do {
+ while (index < batchRowCount && it.hasNext()) {
+ MFieldSchema fieldSchema = it.next();
+ params[paramIndex++] = cdEntry.getKey();
+ params[paramIndex++] = fieldSchema.getComment();
+ params[paramIndex++] = fieldSchema.getName();
+ params[paramIndex++] = fieldSchema.getType();
+ params[paramIndex++] = colIndex++;
+ index++;
+ }
+ if (index < batchRowCount) {
+ colIndex = 0;
+ cdEntry = cdIt.next();
+ it = cdEntry.getValue().getCols().iterator();
+ }
+ } while (index < batchRowCount);
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(COLUMNS_V2, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertBucketColInBatch(Map<Long, MStorageDescriptor>
sdIdToStorageDescriptor) throws MetaException {
+ int rowCount = 0;
+ for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+ rowCount += sd.getBucketCols().size();
+ }
+ if (rowCount == 0) {
+ return;
+ }
+ String columns = "(\"SD_ID\",\"BUCKET_COL_NAME\",\"INTEGER_IDX\")";
+ String row = "(?,?,?)";
+ int columnCount = 3;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ int colIndex = 0;
+ final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt =
sdIdToStorageDescriptor.entrySet().iterator();
+ Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+ Iterator<String> it = sdEntry.getValue().getBucketCols().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int index = 0;
+ int paramIndex = 0;
+ do {
+ while (index < batchRowCount && it.hasNext()) {
+ params[paramIndex++] = sdEntry.getKey();
+ params[paramIndex++] = it.next();
+ params[paramIndex++] = colIndex++;
+ index++;
+ }
+ if (index < batchRowCount) {
+ colIndex = 0;
+ sdEntry = sdIt.next();
+ it = sdEntry.getValue().getBucketCols().iterator();
+ }
+ } while (index < batchRowCount);
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(BUCKETING_COLS, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertSortColInBatch(Map<Long, MStorageDescriptor>
sdIdToStorageDescriptor) throws MetaException {
+ int rowCount = 0;
+ for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+ rowCount += sd.getSortCols().size();
+ }
+ if (rowCount == 0) {
+ return;
+ }
+ String columns = "(\"SD_ID\",\"COLUMN_NAME\",\"ORDER\",\"INTEGER_IDX\")";
+ String row = "(?,?,?,?)";
+ int columnCount = 4;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ int colIndex = 0;
+ final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt =
sdIdToStorageDescriptor.entrySet().iterator();
+ Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+ Iterator<MOrder> it = sdEntry.getValue().getSortCols().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int index = 0;
+ int paramIndex = 0;
+ do {
+ while (index < batchRowCount && it.hasNext()) {
+ MOrder order = it.next();
+ params[paramIndex++] = sdEntry.getKey();
+ params[paramIndex++] = order.getCol();
+ params[paramIndex++] = order.getOrder();
+ params[paramIndex++] = colIndex++;
+ index++;
+ }
+ if (index < batchRowCount) {
+ colIndex = 0;
+ sdEntry = sdIt.next();
+ it = sdEntry.getValue().getSortCols().iterator();
+ }
+ } while (index < batchRowCount);
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(SORT_COLS, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertSkewedStringListInBatch(List<Long> stringListIds) throws
MetaException {
+ int rowCount = stringListIds.size();
+ String columns = "(\"STRING_LIST_ID\")";
+ String row = "(?)";
+ int columnCount = 1;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Long> it = stringListIds.iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int paramIndex = 0;
+ for (int index = 0; index < batchRowCount; index++) {
+ params[paramIndex++] = it.next();
+ }
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(SKEWED_STRING_LIST, columns, columnCount, row, rowCount,
bec);
+ }
+
+ private void insertSkewedStringListValInBatch(Map<Long, List<String>>
stringListIdToStringList) throws MetaException {
+ int rowCount = 0;
+ for (List<String> stringList : stringListIdToStringList.values()) {
+ rowCount += stringList.size();
+ }
+ if (rowCount == 0) {
+ return;
+ }
+ String columns =
"(\"STRING_LIST_ID\",\"STRING_LIST_VALUE\",\"INTEGER_IDX\")";
+ String row = "(?,?,?)";
+ int columnCount = 3;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ int colIndex = 0;
+ final Iterator<Map.Entry<Long, List<String>>> stringListIt =
stringListIdToStringList.entrySet().iterator();
+ Map.Entry<Long, List<String>> stringListEntry = stringListIt.next();
+ Iterator<String> it = stringListEntry.getValue().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int index = 0;
+ int paramIndex = 0;
+ do {
+ while (index < batchRowCount && it.hasNext()) {
+ params[paramIndex++] = stringListEntry.getKey();
+ params[paramIndex++] = it.next();
+ params[paramIndex++] = colIndex++;
+ index++;
+ }
+ if (index < batchRowCount) {
+ colIndex = 0;
+ stringListEntry = stringListIt.next();
+ it = stringListEntry.getValue().iterator();
+ }
+ } while (index < batchRowCount);
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(SKEWED_STRING_LIST_VALUES, columns, columnCount, row,
rowCount, bec);
+ }
+
+ private void insertSkewedColInBatch(Map<Long, MStorageDescriptor>
sdIdToStorageDescriptor) throws MetaException {
+ int rowCount = 0;
+ for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) {
+ rowCount += sd.getSkewedColNames().size();
+ }
+ if (rowCount == 0) {
+ return;
+ }
+ String columns = "(\"SD_ID\",\"SKEWED_COL_NAME\",\"INTEGER_IDX\")";
+ String row = "(?,?,?)";
+ int columnCount = 3;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ int colIndex = 0;
+ final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt =
sdIdToStorageDescriptor.entrySet().iterator();
+ Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next();
+ Iterator<String> it = sdEntry.getValue().getSkewedColNames().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int index = 0;
+ int paramIndex = 0;
+ do {
+ while (index < batchRowCount && it.hasNext()) {
+ params[paramIndex++] = sdEntry.getKey();
+ params[paramIndex++] = it.next();
+ params[paramIndex++] = colIndex++;
+ index++;
+ }
+ if (index < batchRowCount) {
+ colIndex = 0;
+ sdEntry = sdIt.next();
+ it = sdEntry.getValue().getSkewedColNames().iterator();
+ }
+ } while (index < batchRowCount);
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(SKEWED_COL_NAMES, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertSkewedValInBatch(List<Long> stringListIds, Map<Long,
Long> stringListIdToSdId)
+ throws MetaException {
+ int rowCount = stringListIds.size();
+ String columns = "(\"SD_ID_OID\",\"STRING_LIST_ID_EID\",\"INTEGER_IDX\")";
+ String row = "(?,?,?)";
+ int columnCount = 3;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ int colIndex = 0;
+ long prevSdId = -1;
+ final Iterator<Long> it = stringListIds.iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int paramIndex = 0;
+ for (int index = 0; index < batchRowCount; index++) {
+ Long stringListId = it.next();
+ Long sdId = stringListIdToSdId.get(stringListId);
+ params[paramIndex++] = sdId;
+ params[paramIndex++] = stringListId;
+ if (prevSdId != sdId) {
+ colIndex = 0;
+ }
+ params[paramIndex++] = colIndex++;
+ prevSdId = sdId;
+ }
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(SKEWED_VALUES, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertSkewedLocationInBatch(Map<Long, String>
stringListIdToLocation, Map<Long, Long> stringListIdToSdId)
+ throws MetaException {
+ int rowCount = stringListIdToLocation.size();
+ String columns = "(\"SD_ID\",\"STRING_LIST_ID_KID\",\"LOCATION\")";
+ String row = "(?,?,?)";
+ int columnCount = 3;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Map.Entry<Long, String>> it =
stringListIdToLocation.entrySet().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int paramIndex = 0;
+ for (int index = 0; index < batchRowCount; index++) {
+ Map.Entry<Long, String> entry = it.next();
+ params[paramIndex++] = stringListIdToSdId.get(entry.getKey());
+ params[paramIndex++] = entry.getKey();
+ params[paramIndex++] = entry.getValue();
+ }
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(SKEWED_COL_VALUE_LOC_MAP, columns, columnCount, row,
rowCount, bec);
+ }
+
+ private void insertPartitionPrivilegeInBatch(Map<Long, MPartitionPrivilege>
partGrantIdToPrivilege,
+ Map<Long, Long> partGrantIdToPartId) throws MetaException {
+ int rowCount = partGrantIdToPrivilege.size();
+ String columns =
"(\"PART_GRANT_ID\",\"AUTHORIZER\",\"CREATE_TIME\",\"GRANT_OPTION\",\"GRANTOR\",\"GRANTOR_TYPE\","
+ + "\"PART_ID\",\"PRINCIPAL_NAME\",\"PRINCIPAL_TYPE\",\"PART_PRIV\")";
+ String row = "(?,?,?,?,?,?,?,?,?,?)";
+ int columnCount = 10;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Map.Entry<Long, MPartitionPrivilege>> it =
partGrantIdToPrivilege.entrySet().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int paramIndex = 0;
+ for (int index = 0; index < batchRowCount; index++) {
+ Map.Entry<Long, MPartitionPrivilege> entry = it.next();
+ MPartitionPrivilege partPrivilege = entry.getValue();
+ params[paramIndex++] = entry.getKey();
+ params[paramIndex++] = partPrivilege.getAuthorizer();
+ params[paramIndex++] = partPrivilege.getCreateTime();
+ params[paramIndex++] = partPrivilege.getGrantOption();
+ params[paramIndex++] = partPrivilege.getGrantor();
+ params[paramIndex++] = partPrivilege.getGrantorType();
+ params[paramIndex++] = partGrantIdToPartId.get(entry.getKey());
+ params[paramIndex++] = partPrivilege.getPrincipalName();
+ params[paramIndex++] = partPrivilege.getPrincipalType();
+ params[paramIndex++] = partPrivilege.getPrivilege();
+ }
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(PART_PRIVS, columns, columnCount, row, rowCount, bec);
+ }
+
+ private void insertPartitionColPrivilegeInBatch(Map<Long,
MPartitionColumnPrivilege> partColumnGrantIdToPrivilege,
+ Map<Long, Long> partColumnGrantIdToPartId) throws MetaException {
+ int rowCount = partColumnGrantIdToPrivilege.size();
+ String columns =
"(\"PART_COLUMN_GRANT_ID\",\"AUTHORIZER\",\"COLUMN_NAME\",\"CREATE_TIME\",\"GRANT_OPTION\","
+ +
"\"GRANTOR\",\"GRANTOR_TYPE\",\"PART_ID\",\"PRINCIPAL_NAME\",\"PRINCIPAL_TYPE\",\"PART_COL_PRIV\")";
+ String row = "(?,?,?,?,?,?,?,?,?,?,?)";
+ int columnCount = 11;
+ BatchExecutionContext bec = new BatchExecutionContext() {
+ final Iterator<Map.Entry<Long, MPartitionColumnPrivilege>> it
+ = partColumnGrantIdToPrivilege.entrySet().iterator();
+ @Override
+ public void execute(String batchQueryText, int batchRowCount, int
batchParamCount) throws MetaException {
+ Object[] params = new Object[batchParamCount];
+ int paramIndex = 0;
+ for (int index = 0; index < batchRowCount; index++) {
+ Map.Entry<Long, MPartitionColumnPrivilege> entry = it.next();
+ MPartitionColumnPrivilege partColumnPrivilege = entry.getValue();
+ params[paramIndex++] = entry.getKey();
+ params[paramIndex++] = partColumnPrivilege.getAuthorizer();
+ params[paramIndex++] = partColumnPrivilege.getColumnName();
+ params[paramIndex++] = partColumnPrivilege.getCreateTime();
+ params[paramIndex++] = partColumnPrivilege.getGrantOption();
+ params[paramIndex++] = partColumnPrivilege.getGrantor();
+ params[paramIndex++] = partColumnPrivilege.getGrantorType();
+ params[paramIndex++] = partColumnGrantIdToPartId.get(entry.getKey());
+ params[paramIndex++] = partColumnPrivilege.getPrincipalName();
+ params[paramIndex++] = partColumnPrivilege.getPrincipalType();
+ params[paramIndex++] = partColumnPrivilege.getPrivilege();
+ }
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) {
+ executeWithArray(query.getInnerQuery(), params, batchQueryText);
+ }
+ }
+ };
+ insertInBatch(PART_COL_PRIVS, columns, columnCount, row, rowCount, bec);
+ }
+
+ /**
+ * Add partitions in batch using direct SQL
+ * @param parts list of partitions
+ * @param partPrivilegesList list of partition privileges
+ * @param partColPrivilegesList list of partition column privileges
+ * @throws MetaException
+ */
+ public void addPartitions(List<MPartition> parts,
List<List<MPartitionPrivilege>> partPrivilegesList,
+ List<List<MPartitionColumnPrivilege>> partColPrivilegesList) throws
MetaException {
+ Map<Long, MSerDeInfo> serdeIdToSerDeInfo = new HashMap<>();
+ Map<Long, MColumnDescriptor> cdIdToColumnDescriptor = new HashMap<>();
+ Map<Long, MStorageDescriptor> sdIdToStorageDescriptor = new HashMap<>();
+ Map<Long, MPartition> partIdToPartition = new HashMap<>();
+ Map<Long, MPartitionPrivilege> partGrantIdToPrivilege = new HashMap<>();
+ Map<Long, MPartitionColumnPrivilege> partColumnGrantIdToPrivilege = new
HashMap<>();
+ Map<Long, Long> sdIdToSerdeId = new HashMap<>();
+ Map<Long, Long> sdIdToCdId = new HashMap<>();
+ Map<Long, Long> partIdToSdId = new HashMap<>();
+ Map<Long, List<String>> stringListIdToStringList = new HashMap<>();
+ Map<Long, Long> stringListIdToSdId = new HashMap<>();
+ Map<Long, String> stringListIdToLocation = new HashMap<>();
+ Map<Long, Long> partGrantIdToPartId = new HashMap<>();
+ Map<Long, Long> partColumnGrantIdToPartId = new HashMap<>();
+ List<Long> stringListIds = new ArrayList<>();
+ int partitionsCount = parts.size();
+ for (int index = 0; index < partitionsCount; index++) {
+ MPartition part = parts.get(index);
+ Long serDeId = getDataStoreId(MSerDeInfo.class);
+ serdeIdToSerDeInfo.put(serDeId, part.getSd().getSerDeInfo());
+
+ Long cdId;
+ DatastoreId storeId = (DatastoreId) pm.getObjectId(part.getSd().getCD());
+ if (storeId == null) {
+ cdId = getDataStoreId(MColumnDescriptor.class);
+ cdIdToColumnDescriptor.put(cdId, part.getSd().getCD());
+ } else {
+ cdId = (Long) storeId.getKeyAsObject();
+ }
+
+ Long sdId = getDataStoreId(MStorageDescriptor.class);
+ sdIdToStorageDescriptor.put(sdId, part.getSd());
+ sdIdToSerdeId.put(sdId, serDeId);
+ sdIdToCdId.put(sdId, cdId);
+
+ Long partId = getDataStoreId(MPartition.class);
Review Comment:
I'm wondering if the unique id would be conflict when there are multiple
Metastore instances in the warehouse.
Issue Time Tracking
-------------------
Worklog Id: (was: 838276)
Time Spent: 1h 10m (was: 1h)
> Explore moving to directsql for ObjectStore::addPartitions
> ----------------------------------------------------------
>
> Key: HIVE-26035
> URL: https://issues.apache.org/jira/browse/HIVE-26035
> Project: Hive
> Issue Type: Bug
> Reporter: Rajesh Balamohan
> Priority: Major
> Labels: pull-request-available
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Currently {{addPartitions}} uses datanuclues and is super slow for large
> number of partitions. It will be good to move to direct sql. Lots of repeated
> SQLs can be avoided as well (e.g SDS, SERDE, TABLE_PARAMS)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)