flyrain commented on code in PR #11555: URL: https://github.com/apache/iceberg/pull/11555#discussion_r1901388519
########## core/src/main/java/org/apache/iceberg/TableMetadataUtil.java: ########## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +public class TableMetadataUtil { Review Comment: Can we remove it as it is not used anywhere? ########## core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java: ########## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ContentFileUtil; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Rewrite table path action. */ +public class RewriteTablePathUtil { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteTablePathUtil.class); + + private RewriteTablePathUtil() {} + + /** + * Rewrite result. + * + * @param <T> type of file to rewrite + */ + public static class RewriteResult<T> implements Serializable { + private final Set<T> toRewrite = Sets.newHashSet(); + private final Set<Pair<String, String>> copyPlan = Sets.newHashSet(); + + public RewriteResult() {} + + public RewriteResult<T> append(RewriteResult<T> r1) { + toRewrite.addAll(r1.toRewrite); + copyPlan.addAll(r1.copyPlan); + return this; + } + + /** Returns next list of files to rewrite (discovered by rewriting this file) */ + public Set<T> toRewrite() { + return toRewrite; + } + + /** + * Returns a copy plan of files whose metadata were rewritten, for each file a source and target + * location + */ + public Set<Pair<String, String>> copyPlan() { + return copyPlan; + } + } + + /** + * Create a new table metadata object, replacing path references + * + * @param metadata source table metadata + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @return copy of table metadata with paths replaced + */ + public static TableMetadata replacePaths( + TableMetadata metadata, String sourcePrefix, String targetPrefix) { + String newLocation = metadata.location().replaceFirst(sourcePrefix, targetPrefix); + List<Snapshot> newSnapshots = updatePathInSnapshots(metadata, sourcePrefix, targetPrefix); + List<TableMetadata.MetadataLogEntry> metadataLogEntries = + updatePathInMetadataLogs(metadata, sourcePrefix, targetPrefix); + long snapshotId = + metadata.currentSnapshot() == null ? -1 : metadata.currentSnapshot().snapshotId(); + Map<String, String> properties = + updateProperties(metadata.properties(), sourcePrefix, targetPrefix); + + return new TableMetadata( + null, + metadata.formatVersion(), + metadata.uuid(), + newLocation, + metadata.lastSequenceNumber(), + metadata.lastUpdatedMillis(), + metadata.lastColumnId(), + metadata.currentSchemaId(), + metadata.schemas(), + metadata.defaultSpecId(), + metadata.specs(), + metadata.lastAssignedPartitionId(), + metadata.defaultSortOrderId(), + metadata.sortOrders(), + properties, + snapshotId, + newSnapshots, + null, + metadata.snapshotLog(), + metadataLogEntries, + metadata.refs(), + metadata.statisticsFiles(), + metadata.partitionStatisticsFiles(), Review Comment: Can we add a `todo` comment here to avoid confusing? People may ask why these file paths are not updated. ########## core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java: ########## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ContentFileUtil; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Rewrite table path action. */ +public class RewriteTablePathUtil { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteTablePathUtil.class); + + private RewriteTablePathUtil() {} + + /** + * Rewrite result. + * + * @param <T> type of file to rewrite + */ + public static class RewriteResult<T> implements Serializable { + private final Set<T> toRewrite = Sets.newHashSet(); + private final Set<Pair<String, String>> copyPlan = Sets.newHashSet(); + + public RewriteResult() {} + + public RewriteResult<T> append(RewriteResult<T> r1) { + toRewrite.addAll(r1.toRewrite); + copyPlan.addAll(r1.copyPlan); + return this; + } + + /** Returns next list of files to rewrite (discovered by rewriting this file) */ + public Set<T> toRewrite() { + return toRewrite; + } + + /** + * Returns a copy plan of files whose metadata were rewritten, for each file a source and target + * location + */ + public Set<Pair<String, String>> copyPlan() { + return copyPlan; + } + } + + /** + * Create a new table metadata object, replacing path references + * + * @param metadata source table metadata + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @return copy of table metadata with paths replaced + */ + public static TableMetadata replacePaths( + TableMetadata metadata, String sourcePrefix, String targetPrefix) { + String newLocation = metadata.location().replaceFirst(sourcePrefix, targetPrefix); + List<Snapshot> newSnapshots = updatePathInSnapshots(metadata, sourcePrefix, targetPrefix); + List<TableMetadata.MetadataLogEntry> metadataLogEntries = + updatePathInMetadataLogs(metadata, sourcePrefix, targetPrefix); + long snapshotId = + metadata.currentSnapshot() == null ? -1 : metadata.currentSnapshot().snapshotId(); + Map<String, String> properties = + updateProperties(metadata.properties(), sourcePrefix, targetPrefix); + + return new TableMetadata( + null, + metadata.formatVersion(), + metadata.uuid(), + newLocation, + metadata.lastSequenceNumber(), + metadata.lastUpdatedMillis(), + metadata.lastColumnId(), + metadata.currentSchemaId(), + metadata.schemas(), + metadata.defaultSpecId(), + metadata.specs(), + metadata.lastAssignedPartitionId(), + metadata.defaultSortOrderId(), + metadata.sortOrders(), + properties, + snapshotId, + newSnapshots, + null, + metadata.snapshotLog(), + metadataLogEntries, + metadata.refs(), + metadata.statisticsFiles(), + metadata.partitionStatisticsFiles(), + metadata.changes()); + } + + private static Map<String, String> updateProperties( + Map<String, String> tableProperties, String sourcePrefix, String targetPrefix) { + Map<String, String> properties = Maps.newHashMap(tableProperties); + updatePathInProperty(properties, sourcePrefix, targetPrefix, TableProperties.OBJECT_STORE_PATH); + updatePathInProperty( + properties, sourcePrefix, targetPrefix, TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + updatePathInProperty( + properties, sourcePrefix, targetPrefix, TableProperties.WRITE_DATA_LOCATION); + updatePathInProperty( + properties, sourcePrefix, targetPrefix, TableProperties.WRITE_METADATA_LOCATION); + + return properties; + } + + private static void updatePathInProperty( + Map<String, String> properties, + String sourcePrefix, + String targetPrefix, + String propertyName) { + if (properties.containsKey(propertyName)) { + properties.put( + propertyName, newPath(properties.get(propertyName), sourcePrefix, targetPrefix)); + } + } + + private static List<TableMetadata.MetadataLogEntry> updatePathInMetadataLogs( + TableMetadata metadata, String sourcePrefix, String targetPrefix) { + List<TableMetadata.MetadataLogEntry> metadataLogEntries = + Lists.newArrayListWithCapacity(metadata.previousFiles().size()); + for (TableMetadata.MetadataLogEntry metadataLog : metadata.previousFiles()) { + TableMetadata.MetadataLogEntry newMetadataLog = + new TableMetadata.MetadataLogEntry( + metadataLog.timestampMillis(), + newPath(metadataLog.file(), sourcePrefix, targetPrefix)); + metadataLogEntries.add(newMetadataLog); + } + return metadataLogEntries; + } + + private static List<Snapshot> updatePathInSnapshots( + TableMetadata metadata, String sourcePrefix, String targetPrefix) { + List<Snapshot> newSnapshots = Lists.newArrayListWithCapacity(metadata.snapshots().size()); + for (Snapshot snapshot : metadata.snapshots()) { + String newManifestListLocation = + newPath(snapshot.manifestListLocation(), sourcePrefix, targetPrefix); + Snapshot newSnapshot = + new BaseSnapshot( + snapshot.sequenceNumber(), + snapshot.snapshotId(), + snapshot.parentId(), + snapshot.timestampMillis(), + snapshot.operation(), + snapshot.summary(), + snapshot.schemaId(), + newManifestListLocation); + newSnapshots.add(newSnapshot); + } + return newSnapshots; + } + + /** + * Rewrite a manifest list representing a snapshot, replacing path references. + * + * @param snapshot snapshot represented by the manifest list + * @param io file io + * @param tableMetadata metadata of table + * @param manifestsToRewrite a list of manifest files to filter for rewrite + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @param stagingDir staging directory + * @param outputPath location to write the manifest list + * @return a copy plan for manifest files whose metadata were contained in the rewritten manifest + * list + */ + public static RewriteResult<ManifestFile> rewriteManifestList( + Snapshot snapshot, + FileIO io, + TableMetadata tableMetadata, + Set<String> manifestsToRewrite, + String sourcePrefix, + String targetPrefix, + String stagingDir, + String outputPath) { + RewriteResult<ManifestFile> result = new RewriteResult<>(); + OutputFile outputFile = io.newOutputFile(outputPath); + + List<ManifestFile> manifestFiles = manifestFilesInSnapshot(io, snapshot); + List<ManifestFile> manifestFilesToRewrite = + manifestFiles.stream() + .filter(mf -> manifestsToRewrite.contains(mf.path())) + .collect(Collectors.toList()); + manifestFilesToRewrite.forEach( + mf -> + Preconditions.checkArgument( + mf.path().startsWith(sourcePrefix), + "Encountered manifest file %s not under the source prefix %s", + mf.path(), + sourcePrefix)); + + try (FileAppender<ManifestFile> writer = + ManifestLists.write( + tableMetadata.formatVersion(), + outputFile, + snapshot.snapshotId(), + snapshot.parentId(), + snapshot.sequenceNumber())) { + + for (ManifestFile file : manifestFilesToRewrite) { + ManifestFile newFile = file.copy(); + ((StructLike) newFile).set(0, newPath(newFile.path(), sourcePrefix, targetPrefix)); + writer.add(newFile); + + result.toRewrite().add(file); + result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), newFile.path())); + } + return result; + } catch (IOException e) { + throw new UncheckedIOException( + "Failed to rewrite the manifest list file " + snapshot.manifestListLocation(), e); + } + } + + private static List<ManifestFile> manifestFilesInSnapshot(FileIO io, Snapshot snapshot) { + String path = snapshot.manifestListLocation(); + List<ManifestFile> manifestFiles = Lists.newLinkedList(); + try { + manifestFiles = ManifestLists.read(io.newInputFile(path)); + } catch (RuntimeIOException e) { + LOG.warn("Failed to read manifest list {}", path, e); + } + return manifestFiles; + } + + /** + * Rewrite a data manifest, replacing path references. + * + * @param manifestFile source manifest file to rewrite + * @param outputFile output file to rewrite manifest file to + * @param io file io + * @param format format of the manifest file + * @param specsById map of partition specs by id + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @return a copy plan of content files in the manifest that was rewritten + */ + public static List<Pair<String, String>> rewriteManifest( + ManifestFile manifestFile, + OutputFile outputFile, + FileIO io, + int format, + Map<Integer, PartitionSpec> specsById, + String sourcePrefix, + String targetPrefix) + throws IOException { + PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); + try (ManifestWriter<DataFile> writer = + ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId()); + ManifestReader<DataFile> reader = + ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) { + return StreamSupport.stream(reader.entries().spliterator(), false) + .map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, targetPrefix, writer)) + .collect(Collectors.toList()); + } + } + + /** + * Rewrite a delete manifest, replacing path references. + * + * @param manifestFile source delete manifest to rewrite + * @param outputFile output file to rewrite manifest file to + * @param io file io + * @param format format of the manifest file + * @param specsById map of partition specs by id + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @param stagingLocation staging location for rewritten files (referred delete file will be + * rewritten here) + * @return a copy plan of content files in the manifest that was rewritten + */ + public static RewriteResult<DeleteFile> rewriteDeleteManifest( + ManifestFile manifestFile, + OutputFile outputFile, + FileIO io, + int format, + Map<Integer, PartitionSpec> specsById, + String sourcePrefix, + String targetPrefix, + String stagingLocation) + throws IOException { + PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); + try (ManifestWriter<DeleteFile> writer = + ManifestFiles.writeDeleteManifest(format, spec, outputFile, manifestFile.snapshotId()); + ManifestReader<DeleteFile> reader = + ManifestFiles.readDeleteManifest(manifestFile, io, specsById) + .select(Arrays.asList("*"))) { + return StreamSupport.stream(reader.entries().spliterator(), false) + .map( + entry -> + writeDeleteFileEntry( + entry, spec, sourcePrefix, targetPrefix, stagingLocation, writer)) + .reduce(new RewriteResult<>(), RewriteResult::append); + } + } + + private static Pair<String, String> writeDataFileEntry( + ManifestEntry<DataFile> entry, + PartitionSpec spec, + String sourcePrefix, + String targetPrefix, + ManifestWriter<DataFile> writer) { + DataFile dataFile = entry.file(); + String sourceDataFilePath = dataFile.location(); + Preconditions.checkArgument( + sourceDataFilePath.startsWith(sourcePrefix), + "Encountered data file %s not under the source prefix %s", + sourceDataFilePath, + sourcePrefix); + String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, targetPrefix); + DataFile newDataFile = + DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build(); + appendEntryWithFile(entry, writer, newDataFile); + return Pair.of(sourceDataFilePath, newDataFile.location()); + } + + private static RewriteResult<DeleteFile> writeDeleteFileEntry( + ManifestEntry<DeleteFile> entry, + PartitionSpec spec, + String sourcePrefix, + String targetPrefix, + String stagingLocation, + ManifestWriter<DeleteFile> writer) { + + DeleteFile file = entry.file(); + RewriteResult<DeleteFile> result = new RewriteResult<>(); + + switch (file.content()) { + case POSITION_DELETES: + String targetDeleteFilePath = newPath(file.location(), sourcePrefix, targetPrefix); + Metrics metricsWithTargetPath = + ContentFileUtil.replacePathBounds(file, sourcePrefix, targetPrefix); + DeleteFile movedFile = + FileMetadata.deleteFileBuilder(spec) + .copy(file) + .withPath(targetDeleteFilePath) + .withMetrics(metricsWithTargetPath) + .build(); + appendEntryWithFile(entry, writer, movedFile); + result + .copyPlan() + .add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location())); + result.toRewrite().add(file); + return result; + case EQUALITY_DELETES: + DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix); + appendEntryWithFile(entry, writer, eqDeleteFile); + // we do not need to recursively rewrite the equality delete, just move it Review Comment: minor: -> "No need to rewrite equality delete files as they do not contain absolute file paths."? ########## core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java: ########## @@ -60,28 +69,63 @@ public static CharSequence referencedDataFile(DeleteFile deleteFile) { return deleteFile.referencedDataFile(); } - int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId(); - Type pathType = MetadataColumns.DELETE_FILE_PATH.type(); - Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds(); - ByteBuffer lowerPathBound = lowerBounds != null ? lowerBounds.get(pathId) : null; + ByteBuffer lowerPathBound = lowerBounds != null ? lowerBounds.get(PATH_ID) : null; if (lowerPathBound == null) { return null; } Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds(); - ByteBuffer upperPathBound = upperBounds != null ? upperBounds.get(pathId) : null; + ByteBuffer upperPathBound = upperBounds != null ? upperBounds.get(PATH_ID) : null; if (upperPathBound == null) { return null; } if (lowerPathBound.equals(upperPathBound)) { - return Conversions.fromByteBuffer(pathType, lowerPathBound); + return Conversions.fromByteBuffer(PATH_TYPE, lowerPathBound); } else { return null; } } + /** + * Replace file_path reference for a delete file entry + * + * @param deleteFile delete file whose entry will be replaced + * @param sourcePrefix source prefix which will be replaced + * @param targetPrefix target prefix which will replace it + * @return metrics for the new delete file entry + */ + public static Metrics replacePathBounds( + DeleteFile deleteFile, String sourcePrefix, String targetPrefix) { + Preconditions.checkArgument( + deleteFile.content() == FileContent.POSITION_DELETES, + "Only position delete files supported"); + + Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds(); + ByteBuffer lowerPathBound = lowerBounds != null ? lowerBounds.get(PATH_ID) : null; + if (lowerPathBound == null) { + return metricsWithoutPathBounds(deleteFile); + } + + Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds(); + ByteBuffer upperPathBound = upperBounds != null ? upperBounds.get(PATH_ID) : null; + if (upperPathBound == null) { + return metricsWithoutPathBounds(deleteFile); + } + + if (lowerPathBound.equals(upperPathBound)) { Review Comment: I might miss the context. Do we have to handle the cases that lower path bound doesn't equals to upper path bound? Or are always the same in case of file path in a pos delete file? How does filtering work in that case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org