This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch new_object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 50291a686d061d4d6a6b2d92fdc75839c4d38461 Author: Haonan <[email protected]> AuthorDate: Mon Jul 14 09:53:15 2025 +0800 Fix object table split to multi regions (#15927) * Fix object table split to multi region * Fix object table split to multi region * fix npe --- .../node/write/RelationalInsertTabletNode.java | 115 +++++++++++++++------ 1 file changed, 83 insertions(+), 32 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index 63ca8d385ff..1c5c4fbbd3a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -403,36 +403,7 @@ public class RelationalInsertTabletNode extends InsertTabletNode { setDataRegionReplicaSet(entry.getKey()); for (int i = 0; i < columns.length; i++) { if (dataTypes[i] == TSDataType.OBJECT) { - for (int j = 0; j < times.length; j++) { - byte[] binary = ((Binary[]) columns[i])[j].getValues(); - ByteBuffer buffer = ByteBuffer.wrap(binary); - boolean isEoF = buffer.get() == 1; - long offset = buffer.getLong(); - byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); - String relativePath = - TsFileNameGenerator.generateObjectFilePath( - dataRegionReplicaSet.regionId.getId(), times[j], getDeviceID(j)); - ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath); - objectNode.setDataRegionReplicaSet(entry.getKey()); - result.add(objectNode); - if (isEoF) { - byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); - byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; - System.arraycopy( - BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0, Long.BYTES); - System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); - ((Binary[]) columns[i])[j] = new Binary(valueBytes); - } else { - ((Binary[]) columns[i])[j] = null; - if (bitMaps == null) { - bitMaps = new BitMap[columns.length]; - } - if (bitMaps[i] == null) { - bitMaps[i] = new BitMap(rowCount); - } - bitMaps[i].mark(j); - } - } + handleObjectValue(i, 0, times.length, entry, result); } } result.add(this); @@ -441,9 +412,89 @@ public class RelationalInsertTabletNode extends InsertTabletNode { } for (Map.Entry<TRegionReplicaSet, List<Integer>> entry : splitMap.entrySet()) { - // TODO: add ObjectNode for split - result.add(generateOneSplit(entry)); + result.addAll(generateOneSplitList(entry)); } return result; } + + private List<WritePlanNode> generateOneSplitList( + Map.Entry<TRegionReplicaSet, List<Integer>> entry) { + List<Integer> locs; + // generate a new times and values + locs = entry.getValue(); + int count = 0; + for (int i = 0; i < locs.size(); i += 2) { + int start = locs.get(i); + int end = locs.get(i + 1); + count += end - start; + } + + List<WritePlanNode> result = new ArrayList<>(); + + final InsertTabletNode subNode = getEmptySplit(count); + int destLoc = 0; + + for (int k = 0; k < locs.size(); k += 2) { + int start = locs.get(k); + int end = locs.get(k + 1); + final int length = end - start; + + System.arraycopy(times, start, subNode.times, destLoc, length); + for (int i = 0; i < subNode.columns.length; i++) { + if (dataTypes[i] != null) { + System.arraycopy(columns[i], start, subNode.columns[i], destLoc, length); + } + if (dataTypes[i] == TSDataType.OBJECT) { + handleObjectValue(i, start, end, entry, result); + } + if (subNode.bitMaps != null && this.bitMaps[i] != null) { + BitMap.copyOfRange(this.bitMaps[i], start, subNode.bitMaps[i], destLoc, length); + } + } + destLoc += length; + } + subNode.setFailedMeasurementNumber(getFailedMeasurementNumber()); + subNode.setRange(locs); + subNode.setDataRegionReplicaSet(entry.getKey()); + result.add(subNode); + return result; + } + + private void handleObjectValue( + int column, + int startRow, + int endRow, + Map.Entry<TRegionReplicaSet, List<Integer>> entry, + List<WritePlanNode> result) { + for (int j = startRow; j < endRow; j++) { + byte[] binary = ((Binary[]) columns[column])[j].getValues(); + ByteBuffer buffer = ByteBuffer.wrap(binary); + boolean isEoF = buffer.get() == 1; + long offset = buffer.getLong(); + byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); + String relativePath = + TsFileNameGenerator.generateObjectFilePath( + entry.getKey().getRegionId().getId(), times[j], getDeviceID(j)); + ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath); + objectNode.setDataRegionReplicaSet(entry.getKey()); + result.add(objectNode); + if (isEoF) { + byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); + byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; + System.arraycopy( + BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0, Long.BYTES); + System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); + ((Binary[]) columns[column])[j] = new Binary(valueBytes); + } else { + ((Binary[]) columns[column])[j] = null; + if (bitMaps == null) { + bitMaps = new BitMap[columns.length]; + } + if (bitMaps[column] == null) { + bitMaps[column] = new BitMap(rowCount); + } + bitMaps[column].mark(j); + } + } + } }
