This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 03a5c3d4e22c5d0e0a5fba769cef338fac7c5029 Author: Tian Jiang <[email protected]> AuthorDate: Wed Nov 26 19:11:02 2025 +0800 tmp save --- .../db/storageengine/dataregion/DataRegion.java | 43 +++++++- .../dataregion/tsfile/TsFileResource.java | 4 + .../dataregion/tsfile/evolution/ColumnRename.java | 74 ++++++++++++++ .../dataregion/tsfile/evolution/EvolvedSchema.java | 80 +++++++++++++++ .../tsfile/evolution/SchemaEvolution.java | 66 +++++++++++++ .../tsfile/evolution/SchemaEvolutionFile.java | 92 +++++++++++++++++ .../dataregion/tsfile/evolution/TableRename.java | 72 ++++++++++++++ .../dataregion/tsfile/fileset/TsFileSet.java | 109 +++++++++++++++++++++ 8 files changed, 537 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 0dbe6b8870f..734576d9f0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -126,6 +126,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; @@ -359,6 +360,8 @@ public class DataRegion implements IDataRegionForQuery { private ILoadDiskSelector ordinaryLoadDiskSelector; private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector; + private Map<Long, TsFileSet> lastTsFileSetMap = new ConcurrentHashMap<>(); + /** * Construct a database processor. * @@ -644,6 +647,9 @@ public class DataRegion implements IDataRegionForQuery { throw new RuntimeException(e); } } + // ensure that seq and unseq files in the same partition have the same TsFileSet + Map<Long, List<TsFileSet>> recoveredTsFileSetMap = new HashMap<>(); + for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) { Callable<Void> asyncRecoverTask = recoverFilesInPartition( @@ -651,7 +657,8 @@ public class DataRegion implements IDataRegionForQuery { dataRegionRecoveryContext, partitionFiles.getValue(), fileTimeIndexMap, - true); + true, + recoveredTsFileSetMap); if (asyncRecoverTask != null) { asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask); } @@ -664,7 +671,8 @@ public class DataRegion implements IDataRegionForQuery { dataRegionRecoveryContext, partitionFiles.getValue(), fileTimeIndexMap, - false); + false, + recoveredTsFileSetMap); if (asyncRecoverTask != null) { asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask); } @@ -987,11 +995,40 @@ public class DataRegion implements IDataRegionForQuery { DataRegionRecoveryContext context, List<TsFileResource> resourceList, Map<TsFileID, FileTimeIndex> fileTimeIndexMap, - boolean isSeq) { + boolean isSeq, + Map<Long, List<TsFileSet>> tsFileSetMap) { + List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>(); List<TsFileResource> resourceListForSyncRecover = new ArrayList<>(); Callable<Void> asyncRecoverTask = null; for (TsFileResource tsFileResource : resourceList) { + List<TsFileSet> tsFileSets = tsFileSetMap.computeIfAbsent(partitionId, + pid -> { + File fileSetDir = new File(dataRegionSysDir + File.separator + partitionId + File.separator + TsFileSet.FILE_SET_DIR_NAME); + File[] fileSets = fileSetDir.listFiles(); + if (fileSets == null || fileSets.length == 0) { + return Collections.emptyList(); + } else { + List<TsFileSet> results = new ArrayList<>(); + for (File fileSet : fileSets) { + TsFileSet tsFileSet; + try { + tsFileSet = new TsFileSet(Long.parseLong(fileSet.getName()), + fileSetDir.getAbsolutePath(), true); + } catch (NumberFormatException e) { + continue; + } + results.add(tsFileSet); + } + return results; + } + }); + if (!tsFileSets.isEmpty()) { + tsFileSets.sort(null); + } + + + tsFileManager.add(tsFileResource, isSeq); if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID()) && tsFileResource.resourceFileExists()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 3169cf85ccb..13c00febe3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEnt import org.apache.iotdb.db.storageengine.dataregion.modification.v1.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.v1.Modification; import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; @@ -209,6 +210,9 @@ public class TsFileResource implements PersistentResource, Cloneable { private Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues; + // TsFileSets this TsFile belongs to + private List<TsFileSet> tsFileSets; + @TestOnly public TsFileResource() { this.tsFileID = new TsFileID(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java new file mode 100644 index 00000000000..bc8b5946f79 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.tsfile.file.metadata.TsFileMetadata; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +/** + * A schema evolution operation that renames a column in a table schema. + */ +public class ColumnRename implements SchemaEvolution { + + private String tableName; + private String nameBefore; + private String nameAfter; + + // for deserialization + public ColumnRename() { + } + + public ColumnRename(String tableName, String nameBefore, String nameAfter) { + this.tableName = tableName.toLowerCase(); + this.nameBefore = nameBefore.toLowerCase(); + this.nameAfter = nameAfter.toLowerCase(); + } + + @Override + public SchemaEvolutionType getEvolutionType() { + return SchemaEvolutionType.COLUMN_RENAME; + } + + @Override + public void applyTo(EvolvedSchema evolvedSchema) { + evolvedSchema.renameColumn(tableName, nameBefore, nameAfter); + } + + @Override + public long serialize(OutputStream stream) throws IOException { + int size = ReadWriteForEncodingUtils.writeVarInt(getEvolutionType().ordinal(), stream); + size += ReadWriteIOUtils.writeVar(tableName, stream); + size += ReadWriteIOUtils.writeVar(nameBefore, stream); + size += ReadWriteIOUtils.writeVar(nameAfter, stream); + return size; + } + + @Override + public void deserialize(InputStream stream) throws IOException { + tableName = ReadWriteIOUtils.readVarIntString(stream); + nameBefore = ReadWriteIOUtils.readVarIntString(stream); + nameAfter = ReadWriteIOUtils.readVarIntString(stream); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java new file mode 100644 index 00000000000..165c36fc99b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.tsfile.file.metadata.TableSchema; + +public class EvolvedSchema { + // the evolved table names after applying all schema evolution operations + private final Map<String, String> originalTableNames = new HashMap<>(); + /** + * the first key is the evolved table name, the second key is the evolved column name, + * and the value is the original column name before any schema evolution. + */ + private final Map<String, Map<String, String>> originalColumnNames = new HashMap<>(); + + public void renameTable(String oldTableName, String newTableName) { + if (!originalTableNames.containsKey(oldTableName)) { + originalTableNames.put(newTableName, oldTableName); + // mark the old table name as non-exists + originalTableNames.put(oldTableName, ""); + } else { + // mark the old table name as non-exists + String originalName = originalTableNames.put(oldTableName, ""); + originalTableNames.put(newTableName, originalName); + } + + if (originalColumnNames.containsKey(oldTableName)) { + Map<String, String> columnMap = originalColumnNames.remove(oldTableName); + originalColumnNames.put(newTableName, columnMap); + } + } + + public void renameColumn(String tableName, String oldColumnName, String newColumnName) { + Map<String, String> columnNameMap = originalColumnNames.computeIfAbsent(tableName, + t -> new LinkedHashMap<>()); + if (!columnNameMap.containsKey(oldColumnName)) { + columnNameMap.put(newColumnName, oldColumnName); + // mark the old column name as non-exists + columnNameMap.put(oldColumnName, ""); + } else { + String originalName = columnNameMap.put(oldColumnName, ""); + columnNameMap.put(newColumnName, originalName); + } + } + + public String getOriginalTableName(String evolvedTableName) { + return originalTableNames.getOrDefault(evolvedTableName, evolvedTableName); + } + + public String getOriginalColumnName(String tableName, String evolvedColumnName) { + Map<String, String> columnNameMap = originalColumnNames.get(tableName); + if (columnNameMap == null) { + return evolvedColumnName; + } + return columnNameMap.getOrDefault(evolvedColumnName, evolvedColumnName); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java new file mode 100644 index 00000000000..7b3b8b8e50d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; + +import java.io.IOException; +import java.io.InputStream; +import org.apache.iotdb.db.utils.io.StreamSerializable; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; + +/** + * A schema evolution operation that can be applied to a TableSchemaMap. + */ +public interface SchemaEvolution extends StreamSerializable { + + /** + * Apply this schema evolution operation to the given metadata. + * + * @param schema the schema to apply the operation to + */ + void applyTo(EvolvedSchema schema); + + SchemaEvolutionType getEvolutionType(); + + enum SchemaEvolutionType { + TABLE_RENAME, + COLUMN_RENAME + } + + static SchemaEvolution createFrom(InputStream stream) throws IOException { + int type = ReadWriteForEncodingUtils.readVarInt(stream); + if (type < 0 || type > SchemaEvolutionType.values().length) { + throw new IOException("Invalid evolution type: " + type); + } + SchemaEvolution evolution = null; + SchemaEvolutionType evolutionType = SchemaEvolutionType.values()[type]; + switch (evolutionType) { + case TABLE_RENAME: + evolution = new TableRename(); + break; + case COLUMN_RENAME: + evolution = new ColumnRename(); + break; + default: + throw new IOException("Invalid evolution type: " + evolutionType); + } + evolution.deserialize(stream); + return evolution; + } +} \ No newline at end of file diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java new file mode 100644 index 00000000000..1f10683c7bf --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.util.Collection; +import org.apache.iotdb.commons.utils.FileUtils; + +/** + * SchemaEvolutionFile manages schema evolutions related to a TsFileSet. + */ +public class SchemaEvolutionFile { + public static final String FILE_SUFFIX = ".sevo"; + + private String filePath; + + public SchemaEvolutionFile(String filePath) { + this.filePath = filePath; + } + + private void recoverFile() throws IOException { + File file = new File(filePath); + if (!file.exists() || file.length() == 0) { + return; + } + + long length = file.length(); + String fileName = file.getName(); + long validLength = Long.parseLong(fileName.substring(fileName.lastIndexOf('.'))); + if (length > validLength) { + try (FileInputStream fis = new FileInputStream(file); + FileChannel fileChannel = fis.getChannel()) { + fileChannel.truncate(validLength); + } + } + } + + + public void append(Collection<SchemaEvolution> schemaEvolutions) throws IOException { + recoverFile(); + + try (FileOutputStream fos = new FileOutputStream(filePath, true); + BufferedOutputStream bos = new BufferedOutputStream(fos)) { + for (SchemaEvolution schemaEvolution : schemaEvolutions) { + schemaEvolution.serialize(bos); + } + } + + File originFile = new File(filePath); + long newLength = originFile.length(); + File newFile = new File(originFile.getParentFile(), newLength + FILE_SUFFIX); + FileUtils.moveFileSafe(originFile, newFile); + filePath = newFile.getAbsolutePath(); + } + + public EvolvedSchema readAsSchema() throws IOException { + recoverFile(); + + EvolvedSchema evolvedSchema = new EvolvedSchema(); + try (FileInputStream fis = new FileInputStream(filePath); + BufferedInputStream bis = new BufferedInputStream(fis)) { + while (bis.available() > 0) { + SchemaEvolution evolution = SchemaEvolution.createFrom(bis); + evolution.applyTo(evolvedSchema); + } + } + return evolvedSchema; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java new file mode 100644 index 00000000000..aac78060b61 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.file.metadata.TsFileMetadata; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +/** + * A schema evolution operation that renames a table in a schema map. + */ +public class TableRename implements SchemaEvolution { + + private String nameBefore; + private String nameAfter; + + // for deserialization + public TableRename() { + } + + public TableRename(String nameBefore, String nameAfter) { + this.nameBefore = nameBefore.toLowerCase(); + this.nameAfter = nameAfter.toLowerCase(); + } + + + @Override + public void applyTo(EvolvedSchema evolvedSchema) { + evolvedSchema.renameTable(nameBefore, nameAfter); + } + + @Override + public SchemaEvolutionType getEvolutionType() { + return SchemaEvolutionType.TABLE_RENAME; + } + + @Override + public long serialize(OutputStream stream) throws IOException { + long size = ReadWriteForEncodingUtils.writeVarInt(getEvolutionType().ordinal(), stream); + size += ReadWriteIOUtils.writeVar(nameBefore, stream); + size += ReadWriteIOUtils.writeVar(nameAfter, stream); + return size; + } + + @Override + public void deserialize(InputStream stream) throws IOException { + nameBefore = ReadWriteIOUtils.readVarIntString(stream); + nameAfter = ReadWriteIOUtils.readVarIntString(stream); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java new file mode 100644 index 00000000000..19ee2a7ed66 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset; + + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile; + +/** + * TsFileSet represents a set of TsFiles in a time partition whose version <= endVersion. + */ +public class TsFileSet implements Comparable<TsFileSet> { + + public static final String FILE_SET_DIR_NAME = "filesets"; + + private final long endVersion; + private final File fileSetDir; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private SchemaEvolutionFile schemaEvolutionFile; + + public TsFileSet(long endVersion, String fileSetsDir, boolean recover) { + this.endVersion = endVersion; + this.fileSetDir = new File(fileSetsDir + File.separator + endVersion); + + if (recover) { + recover(); + } else { + //noinspection ResultOfMethodCallIgnored + fileSetDir.mkdirs(); + } + + if (schemaEvolutionFile == null) { + schemaEvolutionFile = new SchemaEvolutionFile(0 + SchemaEvolutionFile.FILE_SUFFIX); + } + } + + private void recover() { + File[] files = fileSetDir.listFiles(); + if (files != null) { + for (File file : files) { + if (file.getName().endsWith(SchemaEvolutionFile.FILE_SUFFIX)) { + schemaEvolutionFile = new SchemaEvolutionFile(file.getAbsolutePath()); + } + } + } + } + + public void appendSchemaEvolution(Collection<SchemaEvolution> schemaEvolutions) + throws IOException { + writeLock(); + try { + schemaEvolutionFile.append(schemaEvolutions); + } finally { + writeUnlock(); + } + } + + public EvolvedSchema readEvolvedSchema() throws IOException { + readLock(); + try { + return schemaEvolutionFile.readAsSchema(); + } finally { + readUnlock(); + } + } + + @Override + public int compareTo(TsFileSet o) { + return Long.compare(endVersion, o.endVersion); + } + + public void writeLock() { + lock.writeLock().lock(); + } + + public void readLock() { + lock.readLock().lock(); + } + + public void writeUnlock() { + lock.writeLock().unlock(); + } + + public void readUnlock() { + lock.readLock().unlock(); + } +}
