LakshSingla commented on code in PR #15953: URL: https://github.com/apache/druid/pull/15953#discussion_r1531505943
########## docs/multi-stage-query/reference.md: ########## @@ -99,6 +99,17 @@ For more information, see [Read external data with EXTERN](concepts.md#read-exte This variation of EXTERN requires one argument, the details of the destination as specified below. This variation additionally requires an `AS` clause to specify the format of the exported rows. +While exporting data, some metadata files will also be created at the destination in addition to the data. These files will be created in a directory `_symlink_format_manifest`. +- `_symlink_format_manifest/manifest`: Lists the files which were created as part of the export. The file is in the symlink manifest format, and consists of a list of absolute paths to the files created. Review Comment: What is the symlink manifest format? I wasn't able to find a definitive answer while searching "symlink manifest format", therefore some clarification would be helpful. Also, is it for Druid's internal use, or can other systems and operators make use of the manifest file created? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.ExportStorageProvider; +import org.apache.druid.storage.StorageConnector; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Manages writing of metadata files during export queries. + */ +public class ExportMetadataManager +{ + public static final String SYMLINK_DIR = "_symlink_format_manifest"; + public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest"; + public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta"; + public static final int MANIFEST_FILE_VERSION = 1; + private static final Logger log = new Logger(ExportMetadataManager.class); + private final ExportStorageProvider exportStorageProvider; + + public ExportMetadataManager(final ExportStorageProvider exportStorageProvider) + { + this.exportStorageProvider = exportStorageProvider; + } + + public void writeMetadata(List<String> exportedFiles) throws IOException + { + final StorageConnector storageConnector = exportStorageProvider.get(); + log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath()); Review Comment: nit: The sentence should make sense when reading without the interpolation ```suggestion log.info("Writing manifest file at location[%s]", exportStorageProvider.getBasePath()); ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java: ########## @@ -107,7 +112,14 @@ public ProcessorsAndChannels<Object, Long> makeProcessors( ); if (inputSliceReader.numReadableInputs(slice) == 0) { - return new ProcessorsAndChannels<>(ProcessorManagers.none(), OutputChannels.none()); + return new ProcessorsAndChannels<>( + ProcessorManagers.of(Sequences.<ExportResultsFrameProcessor>empty()) + .withAccumulation(new ArrayList<String>(), (acc, file) -> { + ((ArrayList<String>) acc).add((String) file); + return acc; + }), Review Comment: Seems confusing, do we need this? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java: ########## @@ -88,7 +93,7 @@ public ExportStorageProvider getExportStorageProvider() } @Override - public ProcessorsAndChannels<Object, Long> makeProcessors( + public ProcessorsAndChannels<Object, Object> makeProcessors( Review Comment: Why not ```suggestion public ProcessorsAndChannels<Object, List<String>> makeProcessors( ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.ExportStorageProvider; +import org.apache.druid.storage.StorageConnector; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Manages writing of metadata files during export queries. + */ +public class ExportMetadataManager +{ + public static final String SYMLINK_DIR = "_symlink_format_manifest"; + public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest"; + public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta"; + public static final int MANIFEST_FILE_VERSION = 1; + private static final Logger log = new Logger(ExportMetadataManager.class); + private final ExportStorageProvider exportStorageProvider; + + public ExportMetadataManager(final ExportStorageProvider exportStorageProvider) + { + this.exportStorageProvider = exportStorageProvider; + } + + public void writeMetadata(List<String> exportedFiles) throws IOException + { + final StorageConnector storageConnector = exportStorageProvider.get(); + log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath()); + + if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) { + throw DruidException.defensive("Found existing manifest file already present at path."); + } + + createManifestFile(storageConnector, exportedFiles); + createDruidMetadataFile(storageConnector); Review Comment: What happens if the previous call succeeds and this one fails? Would we end up in a partial state where the manifest is created but the metadata isn't? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.ExportStorageProvider; +import org.apache.druid.storage.StorageConnector; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Manages writing of metadata files during export queries. + */ +public class ExportMetadataManager +{ + public static final String SYMLINK_DIR = "_symlink_format_manifest"; + public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest"; + public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta"; + public static final int MANIFEST_FILE_VERSION = 1; + private static final Logger log = new Logger(ExportMetadataManager.class); + private final ExportStorageProvider exportStorageProvider; + + public ExportMetadataManager(final ExportStorageProvider exportStorageProvider) + { + this.exportStorageProvider = exportStorageProvider; + } + + public void writeMetadata(List<String> exportedFiles) throws IOException + { + final StorageConnector storageConnector = exportStorageProvider.get(); + log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath()); + + if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) { + throw DruidException.defensive("Found existing manifest file already present at path."); + } + + createManifestFile(storageConnector, exportedFiles); + createDruidMetadataFile(storageConnector); + } + + /** + * Creates a manifest file containing the list of files created by the export query. The manifest file consists of a + * new line separated list. Each line contains the absolute path to a file created by the export. + */ + public void createManifestFile(StorageConnector storageConnector, List<String> exportedFiles) throws IOException + { + try (PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(storageConnector.write(MANIFEST_FILE), StandardCharsets.UTF_8))) { + for (String exportedFile : exportedFiles) { + printWriter.println(exportStorageProvider.getFilePathForManifest(exportedFile)); + } + } + } + + /** + * Creates a druid metadata file at the export location. This file contains extra information about the export, which + * cannot be stored in the manifest directly, so that it can follow the symlink format. + * <br> + * Currently, this only contains the manifest file version. + */ + private void createDruidMetadataFile(StorageConnector storageConnector) throws IOException Review Comment: Seems like we are writing the results in an ad-hoc format. I think it makes sense to use one of the standard formats like JSON, YAML, etc if this is a user-facing file. Else, we should remove it from the documentation as well, since it is an implementation detail. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.ExportStorageProvider; +import org.apache.druid.storage.StorageConnector; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Manages writing of metadata files during export queries. + */ +public class ExportMetadataManager +{ + public static final String SYMLINK_DIR = "_symlink_format_manifest"; + public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest"; + public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta"; + public static final int MANIFEST_FILE_VERSION = 1; + private static final Logger log = new Logger(ExportMetadataManager.class); + private final ExportStorageProvider exportStorageProvider; + + public ExportMetadataManager(final ExportStorageProvider exportStorageProvider) + { + this.exportStorageProvider = exportStorageProvider; + } + + public void writeMetadata(List<String> exportedFiles) throws IOException + { + final StorageConnector storageConnector = exportStorageProvider.get(); + log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath()); + + if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) { + throw DruidException.defensive("Found existing manifest file already present at path."); Review Comment: Why is it a defensive check? A user can create a manifest file manually, and the job will fail. Then it isn't a defensive check. We should use something relevant to either the users or the operator here. I think it makes sense that we don't expect to encounter it, given that the files would be namespace with task id, however, it still shouldn't be a defensive check. ########## docs/multi-stage-query/reference.md: ########## @@ -99,6 +99,17 @@ For more information, see [Read external data with EXTERN](concepts.md#read-exte This variation of EXTERN requires one argument, the details of the destination as specified below. This variation additionally requires an `AS` clause to specify the format of the exported rows. +While exporting data, some metadata files will also be created at the destination in addition to the data. These files will be created in a directory `_symlink_format_manifest`. +- `_symlink_format_manifest/manifest`: Lists the files which were created as part of the export. The file is in the symlink manifest format, and consists of a list of absolute paths to the files created. +```text +s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker2-partition2.csv +s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker1-partition1.csv +s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition0.csv +... +s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition24.csv +``` +- `_symlink_format_manifest/druid_export_meta`: Used to store additional information about the export metadata, such as the version of the manifest file format. Review Comment: Is this version for internal use, or does it have relevance outside of Druid as well? Also, can you please add the format of this metadata file? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java: ########## @@ -127,11 +139,46 @@ public ProcessorsAndChannels<Object, Long> makeProcessors( ); return new ProcessorsAndChannels<>( - ProcessorManagers.of(processors), + ProcessorManagers.of(processors) + .withAccumulation(new ArrayList<String>(), (acc, file) -> { + ((ArrayList<String>) acc).add((String) file); + return acc; + }), OutputChannels.none() ); } + @Nullable + @Override + public TypeReference<Object> getResultTypeReference() + { + return new TypeReference<Object>() {}; Review Comment: ```suggestion return new TypeReference<List<String>>() {}; ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java: ########## @@ -127,11 +139,46 @@ public ProcessorsAndChannels<Object, Long> makeProcessors( ); return new ProcessorsAndChannels<>( - ProcessorManagers.of(processors), + ProcessorManagers.of(processors) + .withAccumulation(new ArrayList<String>(), (acc, file) -> { + ((ArrayList<String>) acc).add((String) file); + return acc; + }), OutputChannels.none() ); } + @Nullable + @Override + public TypeReference<Object> getResultTypeReference() + { + return new TypeReference<Object>() {}; + } + + @Override + public Object mergeAccumulatedResult(Object accumulated, Object otherAccumulated) + { + // Maintain upgrade compatibility, if a worker does not return a list, ignore it. Review Comment: Can we end up in a state where some files are present and some are not? What is to say that there are files absent? I think if the manifest file is for cosmetic purposes only, we can get away with partial results, but it should be explicitly called out in the documentation. Since it's in a widely used format, the expectation seems to be that the users would be able to ingest the results in other systems, like: Druid -> Export destination -> External system This will lead to incorrect results, without the users knowing. I think we should fail the job here, and if not, then this seems like the usage of the manifest file is limited by its correctness and should be called out explicitly. We can also have a context parameter (which I am not a big fan of), that can switch between error-throwing and ignoring behaviour, for those who want correctness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
