This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f71b7156cae45d762d6849239c9199c032749bd0 Author: Tian Jiang <[email protected]> AuthorDate: Wed Dec 10 16:27:57 2025 +0800 Add DataRegionTask --- .../db/storageengine/dataregion/DataRegion.java | 76 ++++++++++----- .../dataregion/task/DataRegionTask.java | 61 ++++++++++++ .../dataregion/task/DataRegionTaskManager.java | 108 +++++++++++++++++++++ .../dataregion/task/SchemaEvolutionTask.java | 86 ++++++++++++++++ .../dataregion/tsfile/TsFileResource.java | 10 +- .../dataregion/tsfile/evolution/ColumnRename.java | 17 ++++ .../dataregion/tsfile/fileset/TsFileSet.java | 8 ++ .../storageengine/dataregion/DataRegionTest.java | 2 +- .../org/apache/iotdb/commons/utils/FileUtils.java | 81 ++++++++++++++++ 9 files changed, 420 insertions(+), 29 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index f288e81f82c..b2a340d14bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -122,6 +122,8 @@ import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.ClosedFileScanHandleImpl; +import org.apache.iotdb.db.storageengine.dataregion.task.DataRegionTaskManager; +import org.apache.iotdb.db.storageengine.dataregion.task.SchemaEvolutionTask; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -365,6 +367,8 @@ public class DataRegion implements IDataRegionForQuery { private Map<Long, TsFileSet> lastTsFileSetMap = new ConcurrentHashMap<>(); + private DataRegionTaskManager dataRegionTaskManager; + /** * Construct a database processor. * @@ -727,6 +731,9 @@ public class DataRegion implements IDataRegionForQuery { throw new DataRegionException(e); } + dataRegionTaskManager = new DataRegionTaskManager(this); + dataRegionTaskManager.recover(); + if (asyncTsFileResourceRecoverTaskList.isEmpty()) { initCompactionSchedule(); } @@ -756,9 +763,13 @@ public class DataRegion implements IDataRegionForQuery { protected void updateDeviceLastFlushTime(TsFileResource resource) { long timePartitionId = resource.getTimePartition(); Map<IDeviceID, Long> endTimeMap = new HashMap<>(); + EvolvedSchema mergedEvolvedSchema = resource.getMergedEvolvedSchema(); for (IDeviceID deviceId : resource.getDevices()) { @SuppressWarnings("OptionalGetWithoutIsPresent") // checked above long endTime = resource.getEndTime(deviceId).get(); + if (mergedEvolvedSchema != null) { + deviceId = mergedEvolvedSchema.rewriteDeviceId(deviceId); + } endTimeMap.put(deviceId, endTime); } if (config.isEnableSeparateData()) { @@ -773,10 +784,14 @@ public class DataRegion implements IDataRegionForQuery { long timePartitionId, List<TsFileResource> resources) { Map<IDeviceID, Long> endTimeMap = new HashMap<>(); for (TsFileResource resource : resources) { + EvolvedSchema mergedEvolvedSchema = resource.getMergedEvolvedSchema(); for (IDeviceID deviceId : resource.getDevices()) { // checked above //noinspection OptionalGetWithoutIsPresent long endTime = resource.getEndTime(deviceId).get(); + if (mergedEvolvedSchema != null) { + deviceId = mergedEvolvedSchema.rewriteDeviceId(deviceId); + } endTimeMap.put(deviceId, endTime); } } @@ -1001,6 +1016,10 @@ public class DataRegion implements IDataRegionForQuery { + TsFileSet.FILE_SET_DIR_NAME; } + public File getDataRegionSysDir() { + return dataRegionSysDir; + } + private List<TsFileSet> recoverTsFileSets( long partitionId, Map<Long, List<TsFileSet>> tsFileSetMap @@ -1189,7 +1208,7 @@ public class DataRegion implements IDataRegionForQuery { return newSet; } - public void applySchemaEvolution(List<SchemaEvolution> schemaEvolutions) { + public void applySchemaEvolution(List<SchemaEvolution> schemaEvolutions) throws IOException { long startTime = System.nanoTime(); writeLock("applySchemaEvolution"); PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); @@ -1198,32 +1217,44 @@ public class DataRegion implements IDataRegionForQuery { return; } DataNodeTableCache.getInstance().invalid(databaseName); - schemaEvolutions.forEach(lastFlushTimeMap::accept); syncCloseAllWorkingTsFileProcessors(); - for (Entry<Long, Long> partitionVersionEntry : partitionMaxFileVersions.entrySet()) { - long partitionId = partitionVersionEntry.getKey(); - long maxVersion = partitionVersionEntry.getValue(); - lastTsFileSetMap.compute(partitionId, (pid, lastSet) -> { - if (lastSet == null) { - lastSet = createNewFileSet(maxVersion, partitionId); - } else if (lastSet.getEndVersion() < maxVersion) { - lastSet = createNewFileSet(maxVersion, partitionId); - } - try { - lastSet.appendSchemaEvolution(schemaEvolutions); - } catch (IOException e) { - logger.error("Cannot append schema evolutions to fileSets in partition {}-{}", dataRegionId, partitionId, e); - } - return lastSet; - }); - } + // may update table names in deviceIds + schemaEvolutions.forEach(lastFlushTimeMap::accept); + + SchemaEvolutionTask evolutionTask = new SchemaEvolutionTask(schemaEvolutions, this); + dataRegionTaskManager.submitAndRun(evolutionTask); } finally { writeUnlock(); } } + public void recordSchemaEvolution(List<SchemaEvolution> schemaEvolutions) { + for (Entry<Long, Long> partitionVersionEntry : partitionMaxFileVersions.entrySet()) { + long partitionId = partitionVersionEntry.getKey(); + long maxVersion = partitionVersionEntry.getValue(); + lastTsFileSetMap.compute(partitionId, (pid, lastSet) -> { + if (lastSet == null) { + lastSet = createNewFileSet(maxVersion, partitionId); + } else if (lastSet.getEndVersion() < maxVersion) { + lastSet = createNewFileSet(maxVersion, partitionId); + } + try { + lastSet.appendSchemaEvolution(schemaEvolutions); + } catch (IOException e) { + logger.error("Cannot append schema evolutions to fileSets in partition {}-{}", dataRegionId, partitionId, e); + } + return lastSet; + }); + } + } + + public void applySchemaEvolutionToObjects(List<SchemaEvolution> schemaEvolutions) { + // TODO-SchemaEvolution + throw new UnsupportedOperationException(); + } + /** * insert one row of data. * @@ -2624,12 +2655,7 @@ public class DataRegion implements IDataRegionForQuery { List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); for (TsFileResource tsFileResource : tsFileResources) { - EvolvedSchema evolvedSchema; - try { - evolvedSchema = tsFileResource.getMergedEvolvedSchema(); - } catch (IOException e) { - throw new MetadataException(e); - } + EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(); IDeviceID deviceIdBackThen = singleDeviceId; if (evolvedSchema != null) { deviceIdBackThen = evolvedSchema.rewriteDeviceId(singleDeviceId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java new file mode 100644 index 00000000000..f171003aa69 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.task; + +import java.io.IOException; +import java.io.InputStream; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.utils.io.StreamSerializable; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; + +public interface DataRegionTask extends Runnable, StreamSerializable { + + long getTaskId(); + + void setTaskId(long taskId); + + TaskType getTaskType(); + + enum TaskType { + SchemaEvolutionTask + } + + @SuppressWarnings("SwitchStatementWithTooFewBranches") + static DataRegionTask createFrom(InputStream stream, long taskId, DataRegion dataRegion) throws IOException { + int typeOrdinal = ReadWriteForEncodingUtils.readVarInt(stream); + if (typeOrdinal < 0 || typeOrdinal >= TaskType.values().length) { + throw new IOException("Invalid task type: " + typeOrdinal); + } + + TaskType taskType = TaskType.values()[typeOrdinal]; + + DataRegionTask task = null; + switch (taskType) { + case SchemaEvolutionTask: + task = new SchemaEvolutionTask(dataRegion); + break; + default: + throw new IOException("Invalid task type: " + taskType); + } + task.deserialize(stream); + task.setTaskId(taskId); + return task; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java new file mode 100644 index 00000000000..5441b9b19c7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.task; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("ResultOfMethodCallIgnored") +public class DataRegionTaskManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(DataRegionTaskManager.class); + private static final String TASKS_DIR_NAME = "tasks"; + private static final String TASK_FILE_SUFFIX = ".tsk"; + + private final DataRegion dataRegion; + private final AtomicLong lastestTaskId = new AtomicLong(0); + private final File tasksDir; + + public DataRegionTaskManager(DataRegion dataRegion) { + this.dataRegion = dataRegion; + this.tasksDir = new File(dataRegion.getDataRegionSysDir() + File.separator + TASKS_DIR_NAME); + } + + public void recover() { + tasksDir.mkdirs(); + File[] files = tasksDir.listFiles((File dir, String name) -> name.endsWith(TASK_FILE_SUFFIX)); + if (files == null) { + return; + } + + Arrays.sort(files, (f1, f2) -> { + String fileName1 = f1.getName(); + int suffixIndex1 = fileName1.indexOf("."); + long taskId1 = Long.parseLong(fileName1.substring(0, suffixIndex1)); + + String fileName2 = f2.getName(); + int suffixIndex2 = fileName2.indexOf("."); + long taskId2 = Long.parseLong(fileName1.substring(0, suffixIndex2)); + + return Long.compare(taskId1, taskId2); + }); + + for (File file : files) { + String fileName = file.getName(); + int suffixIndex = fileName.indexOf("."); + long taskId = Long.parseLong(fileName.substring(0, suffixIndex)); + lastestTaskId.getAndUpdate(l -> Math.max(l, taskId)); + + try (FileInputStream fis = new FileInputStream(file); + BufferedInputStream bufferedInputStream = new BufferedInputStream(fis)) { + DataRegionTask task = DataRegionTask.createFrom(bufferedInputStream, taskId, dataRegion); + task.run(); + } catch (IOException e) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Cannot recover task from file {}", file.getAbsolutePath(), e); + } + } finally { + file.delete(); + } + } + } + + private void persistTask(DataRegionTask task) throws IOException { + File taskFile = new File(tasksDir, task.getTaskId() + ".tsk"); + try (FileOutputStream fos = new FileOutputStream(taskFile); + BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fos)) { + task.serialize(bufferedOutputStream); + } + } + + private void removeTask(DataRegionTask task) throws IOException { + File taskFile = new File(tasksDir, task.getTaskId() + ".tsk"); + taskFile.delete(); + } + + public void submitAndRun(DataRegionTask dataRegionTask) throws IOException { + dataRegionTask.setTaskId(lastestTaskId.getAndIncrement()); + persistTask(dataRegionTask); + dataRegionTask.run(); + removeTask(dataRegionTask); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java new file mode 100644 index 00000000000..23a84fbbd72 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.task; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +public class SchemaEvolutionTask implements DataRegionTask { + + private List<SchemaEvolution> schemaEvolutions; + private final DataRegion dataRegion; + private long taskId; + + @Override + public void run() { + dataRegion.recordSchemaEvolution(schemaEvolutions); + dataRegion.applySchemaEvolutionToObjects(schemaEvolutions); + } + + public SchemaEvolutionTask(DataRegion dataRegion) { + this.dataRegion = dataRegion; + } + + public SchemaEvolutionTask(List<SchemaEvolution> schemaEvolutions, DataRegion dataRegion) { + this.schemaEvolutions = schemaEvolutions; + this.dataRegion = dataRegion; + } + + @Override + public long serialize(OutputStream stream) throws IOException { + long size = ReadWriteForEncodingUtils.writeVarInt(getTaskType().ordinal(), stream); + size += ReadWriteForEncodingUtils.writeVarInt(schemaEvolutions.size(), stream); + for (SchemaEvolution schemaEvolution : schemaEvolutions) { + size += schemaEvolution.serialize(stream); + } + return size; + } + + @Override + public void deserialize(InputStream stream) throws IOException { + int size = ReadWriteForEncodingUtils.readVarInt(stream); + schemaEvolutions = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + schemaEvolutions.add(SchemaEvolution.createFrom(stream)); + } + } + + @Override + public long getTaskId() { + return taskId; + } + + @Override + public void setTaskId(long taskId) { + this.taskId = taskId; + } + + @Override + public TaskType getTaskType() { + return TaskType.SchemaEvolutionTask; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 80ab28eec9b..060c16fea7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -1640,11 +1640,15 @@ public class TsFileResource implements PersistentResource, Cloneable { return tsFileSets; } - public EvolvedSchema getMergedEvolvedSchema() throws IOException { + public EvolvedSchema getMergedEvolvedSchema() { List<EvolvedSchema> list = new ArrayList<>(); for (TsFileSet fileSet : getTsFileSets()) { - EvolvedSchema readEvolvedSchema = fileSet.readEvolvedSchema(); - list.add(readEvolvedSchema); + try { + EvolvedSchema readEvolvedSchema = fileSet.readEvolvedSchema(); + list.add(readEvolvedSchema); + } catch (IOException e) { + LOGGER.warn("Cannot read evolved schema from {}, skipping it", fileSet); + } } return EvolvedSchema.merge(list.toArray(new EvolvedSchema[0])); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java index fc363247b13..23c18cea9f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; +import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -32,6 +34,7 @@ public class ColumnRename implements SchemaEvolution { private String tableName; private String nameBefore; private String nameAfter; + private TSDataType dataType; // for deserialization public ColumnRename() {} @@ -58,6 +61,7 @@ public class ColumnRename implements SchemaEvolution { size += ReadWriteIOUtils.writeVar(tableName, stream); size += ReadWriteIOUtils.writeVar(nameBefore, stream); size += ReadWriteIOUtils.writeVar(nameAfter, stream); + size += ReadWriteIOUtils.write(dataType != null ? (byte) dataType.ordinal() : -1, stream); return size; } @@ -66,5 +70,18 @@ public class ColumnRename implements SchemaEvolution { tableName = ReadWriteIOUtils.readVarIntString(stream); nameBefore = ReadWriteIOUtils.readVarIntString(stream); nameAfter = ReadWriteIOUtils.readVarIntString(stream); + byte category = ReadWriteIOUtils.readByte(stream); + if (category != -1) { + dataType = TSDataType.values()[category]; + } + } + + public TSDataType getDataType() { + return dataType; + } + + public void setDataType( + TSDataType dataType) { + this.dataType = dataType; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java index d3271d98b45..5c43418fa64 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java @@ -120,4 +120,12 @@ public class TsFileSet implements Comparable<TsFileSet> { public long getEndVersion() { return endVersion; } + + @Override + public String toString() { + return "TsFileSet{" + + "endVersion=" + endVersion + + ", fileSetDir=" + fileSetDir + + '}'; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index b525b4860e7..e896a599c43 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -1786,7 +1786,7 @@ public class DataRegionTest { @Test public void testSchemaEvolution() - throws IllegalPathException, WriteProcessException, QueryProcessException { + throws IllegalPathException, WriteProcessException, QueryProcessException, IOException { String[] measurements = {"tag1", "s1", "s2"}; MeasurementSchema[] measurementSchemas = { new MeasurementSchema("tag1", TSDataType.STRING), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java index baed8bcd537..a2510454e73 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java @@ -19,6 +19,12 @@ package org.apache.iotdb.commons.utils; +import java.util.HashSet; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.tsfile.external.commons.codec.digest.DigestUtils; @@ -334,6 +340,81 @@ public class FileUtils { return true; } + public static <T> List<T> applyReversedIndexesOnListV2( + final List<Integer> filteredIndexes, final List<T> originalList) { + // filteredIndexes.sort(null); if necessary + List<T> filteredList = new ArrayList<>(originalList.size() - filteredIndexes.size()); + int filteredIndexPos = 0; + int processingIndex = 0; + for (; processingIndex < originalList.size(); processingIndex++) { + if (filteredIndexPos >= filteredIndexes.size()) { + // all filteredIndexes processed, add remaining to the filteredList + filteredList.addAll(originalList.subList(processingIndex, originalList.size())); + break; + } else { + int filteredIndex = filteredIndexes.get(filteredIndexPos); + if (filteredIndex == processingIndex) { + // the index is filtered, move to the next filtered pos + filteredIndexPos ++; + } else { + // the index is not filtered, add to the filteredList + filteredList.add(originalList.get(processingIndex)); + } + } + } + return filteredList; + } + + public static <T> List<T> applyReversedIndexesOnListV1( + final List<Integer> filteredIndexes, final List<T> originalList) { + final Set<Integer> indexes = new HashSet<>(filteredIndexes); + return Objects.nonNull(originalList) + ? IntStream.range(0, originalList.size()) + .filter(index -> !indexes.contains(index)) // 保留不在排除列表中的下标 + .mapToObj(originalList::get) + .collect(Collectors.toList()) + : null; + } + + public static void main(String[] args) { + int elementNum = 10_000_000; + int filteredNum = elementNum / 10; + Random random = new Random(); + List<Integer> originalList = IntStream.range(0, elementNum).boxed().collect(Collectors.toList()); + List<Integer> filteredIndexes = new ArrayList<>(filteredNum); + for (int i = 0; i < filteredNum; i++) { + filteredIndexes.add(random.nextInt(elementNum)); + } + filteredIndexes = filteredIndexes.stream().sorted().distinct().collect(Collectors.toList()); + + long start = System.currentTimeMillis(); + List<Integer> appliedList = applyReversedIndexesOnListV1(filteredIndexes, originalList); + System.out.println(System.currentTimeMillis() - start); + Set<Integer> appliedSet = new HashSet<>(appliedList); + for (Integer filteredIndex : filteredIndexes) { + if (appliedSet.contains(filteredIndex)) { + System.out.println("Incorrect implementation"); + System.exit(-1); + } + } + + + start = System.currentTimeMillis(); + appliedList = WapplyReversedIndexesOnListV2(filteredIndexes, originalList); + System.out.println(System.currentTimeMillis() - start); + appliedSet = new HashSet<>(appliedList); + if (appliedList.size() != originalList.size() - filteredIndexes.size()) { + System.out.println("Incorrect implementation"); + System.exit(-1); + } + for (Integer filteredIndex : filteredIndexes) { + if (appliedSet.contains(filteredIndex)) { + System.out.println("Incorrect implementation"); + System.exit(-1); + } + } + } + public static File createHardLink(File sourceFile, File hardlink) throws IOException { if (!hardlink.getParentFile().exists() && !hardlink.getParentFile().mkdirs()) { synchronized (FileUtils.class) {
