LakshSingla commented on code in PR #15953:
URL: https://github.com/apache/druid/pull/15953#discussion_r1531505943


##########
docs/multi-stage-query/reference.md:
##########
@@ -99,6 +99,17 @@ For more information, see [Read external data with 
EXTERN](concepts.md#read-exte
 This variation of EXTERN requires one argument, the details of the destination 
as specified below.
 This variation additionally requires an `AS` clause to specify the format of 
the exported rows.
 
+While exporting data, some metadata files will also be created at the 
destination in addition to the data. These files will be created in a directory 
`_symlink_format_manifest`.
+- `_symlink_format_manifest/manifest`: Lists the files which were created as 
part of the export. The file is in the symlink manifest format, and consists of 
a list of absolute paths to the files created.

Review Comment:
   What is the symlink manifest format? I wasn't able to find a definitive 
answer while searching "symlink manifest format", therefore some clarification 
would be helpful.
   
   Also, is it for Druid's internal use, or can other systems and operators 
make use of the manifest file created?  



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * Manages writing of metadata files during export queries.
+ */
+public class ExportMetadataManager
+{
+  public static final String SYMLINK_DIR = "_symlink_format_manifest";
+  public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest";
+  public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta";
+  public static final int MANIFEST_FILE_VERSION = 1;
+  private static final Logger log = new Logger(ExportMetadataManager.class);
+  private final ExportStorageProvider exportStorageProvider;
+
+  public ExportMetadataManager(final ExportStorageProvider 
exportStorageProvider)
+  {
+    this.exportStorageProvider = exportStorageProvider;
+  }
+
+  public void writeMetadata(List<String> exportedFiles) throws IOException
+  {
+    final StorageConnector storageConnector = exportStorageProvider.get();
+    log.info("Writing manifest file at [%s]", 
exportStorageProvider.getBasePath());

Review Comment:
   nit: The sentence should make sense when reading without the interpolation
   ```suggestion
       log.info("Writing manifest file at location[%s]", 
exportStorageProvider.getBasePath());
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java:
##########
@@ -107,7 +112,14 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
     );
 
     if (inputSliceReader.numReadableInputs(slice) == 0) {
-      return new ProcessorsAndChannels<>(ProcessorManagers.none(), 
OutputChannels.none());
+      return new ProcessorsAndChannels<>(
+          ProcessorManagers.of(Sequences.<ExportResultsFrameProcessor>empty())
+                           .withAccumulation(new ArrayList<String>(), (acc, 
file) -> {
+                             ((ArrayList<String>) acc).add((String) file);
+                             return acc;
+                           }),

Review Comment:
   Seems confusing, do we need this?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java:
##########
@@ -88,7 +93,7 @@ public ExportStorageProvider getExportStorageProvider()
   }
 
   @Override
-  public ProcessorsAndChannels<Object, Long> makeProcessors(
+  public ProcessorsAndChannels<Object, Object> makeProcessors(

Review Comment:
   Why not 
   ```suggestion
     public ProcessorsAndChannels<Object, List<String>> makeProcessors(
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * Manages writing of metadata files during export queries.
+ */
+public class ExportMetadataManager
+{
+  public static final String SYMLINK_DIR = "_symlink_format_manifest";
+  public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest";
+  public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta";
+  public static final int MANIFEST_FILE_VERSION = 1;
+  private static final Logger log = new Logger(ExportMetadataManager.class);
+  private final ExportStorageProvider exportStorageProvider;
+
+  public ExportMetadataManager(final ExportStorageProvider 
exportStorageProvider)
+  {
+    this.exportStorageProvider = exportStorageProvider;
+  }
+
+  public void writeMetadata(List<String> exportedFiles) throws IOException
+  {
+    final StorageConnector storageConnector = exportStorageProvider.get();
+    log.info("Writing manifest file at [%s]", 
exportStorageProvider.getBasePath());
+
+    if (storageConnector.pathExists(MANIFEST_FILE) || 
storageConnector.pathExists(META_FILE)) {
+      throw DruidException.defensive("Found existing manifest file already 
present at path.");
+    }
+
+    createManifestFile(storageConnector, exportedFiles);
+    createDruidMetadataFile(storageConnector);

Review Comment:
   What happens if the previous call succeeds and this one fails? Would we end 
up in a partial state where the manifest is created but the metadata isn't?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * Manages writing of metadata files during export queries.
+ */
+public class ExportMetadataManager
+{
+  public static final String SYMLINK_DIR = "_symlink_format_manifest";
+  public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest";
+  public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta";
+  public static final int MANIFEST_FILE_VERSION = 1;
+  private static final Logger log = new Logger(ExportMetadataManager.class);
+  private final ExportStorageProvider exportStorageProvider;
+
+  public ExportMetadataManager(final ExportStorageProvider 
exportStorageProvider)
+  {
+    this.exportStorageProvider = exportStorageProvider;
+  }
+
+  public void writeMetadata(List<String> exportedFiles) throws IOException
+  {
+    final StorageConnector storageConnector = exportStorageProvider.get();
+    log.info("Writing manifest file at [%s]", 
exportStorageProvider.getBasePath());
+
+    if (storageConnector.pathExists(MANIFEST_FILE) || 
storageConnector.pathExists(META_FILE)) {
+      throw DruidException.defensive("Found existing manifest file already 
present at path.");
+    }
+
+    createManifestFile(storageConnector, exportedFiles);
+    createDruidMetadataFile(storageConnector);
+  }
+
+  /**
+   * Creates a manifest file containing the list of files created by the 
export query. The manifest file consists of a
+   * new line separated list. Each line contains the absolute path to a file 
created by the export.
+   */
+  public void createManifestFile(StorageConnector storageConnector, 
List<String> exportedFiles) throws IOException
+  {
+    try (PrintWriter printWriter = new PrintWriter(new 
OutputStreamWriter(storageConnector.write(MANIFEST_FILE), 
StandardCharsets.UTF_8))) {
+      for (String exportedFile : exportedFiles) {
+        
printWriter.println(exportStorageProvider.getFilePathForManifest(exportedFile));
+      }
+    }
+  }
+
+  /**
+   * Creates a druid metadata file at the export location. This file contains 
extra information about the export, which
+   * cannot be stored in the manifest directly, so that it can follow the 
symlink format.
+   * <br>
+   * Currently, this only contains the manifest file version.
+   */
+  private void createDruidMetadataFile(StorageConnector storageConnector) 
throws IOException

Review Comment:
   Seems like we are writing the results in an ad-hoc format. I think it makes 
sense to use one of the standard formats like JSON, YAML, etc if this is a 
user-facing file. Else, we should remove it from the documentation as well, 
since it is an implementation detail.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * Manages writing of metadata files during export queries.
+ */
+public class ExportMetadataManager
+{
+  public static final String SYMLINK_DIR = "_symlink_format_manifest";
+  public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest";
+  public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta";
+  public static final int MANIFEST_FILE_VERSION = 1;
+  private static final Logger log = new Logger(ExportMetadataManager.class);
+  private final ExportStorageProvider exportStorageProvider;
+
+  public ExportMetadataManager(final ExportStorageProvider 
exportStorageProvider)
+  {
+    this.exportStorageProvider = exportStorageProvider;
+  }
+
+  public void writeMetadata(List<String> exportedFiles) throws IOException
+  {
+    final StorageConnector storageConnector = exportStorageProvider.get();
+    log.info("Writing manifest file at [%s]", 
exportStorageProvider.getBasePath());
+
+    if (storageConnector.pathExists(MANIFEST_FILE) || 
storageConnector.pathExists(META_FILE)) {
+      throw DruidException.defensive("Found existing manifest file already 
present at path.");

Review Comment:
   Why is it a defensive check? A user can create a manifest file manually, and 
the job will fail. Then it isn't a defensive check. We should use something 
relevant to either the users or the operator here. I think it makes sense that 
we don't expect to encounter it, given that the files would be namespace with 
task id, however, it still shouldn't be a defensive check. 



##########
docs/multi-stage-query/reference.md:
##########
@@ -99,6 +99,17 @@ For more information, see [Read external data with 
EXTERN](concepts.md#read-exte
 This variation of EXTERN requires one argument, the details of the destination 
as specified below.
 This variation additionally requires an `AS` clause to specify the format of 
the exported rows.
 
+While exporting data, some metadata files will also be created at the 
destination in addition to the data. These files will be created in a directory 
`_symlink_format_manifest`.
+- `_symlink_format_manifest/manifest`: Lists the files which were created as 
part of the export. The file is in the symlink manifest format, and consists of 
a list of absolute paths to the files created.
+```text
+s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker2-partition2.csv
+s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker1-partition1.csv
+s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition0.csv
+...
+s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition24.csv
+```
+- `_symlink_format_manifest/druid_export_meta`: Used to store additional 
information about the export metadata, such as the version of the manifest file 
format.

Review Comment:
   Is this version for internal use, or does it have relevance outside of Druid 
as well? Also, can you please add the format of this metadata file?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java:
##########
@@ -127,11 +139,46 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
     );
 
     return new ProcessorsAndChannels<>(
-        ProcessorManagers.of(processors),
+        ProcessorManagers.of(processors)
+                         .withAccumulation(new ArrayList<String>(), (acc, 
file) -> {
+                           ((ArrayList<String>) acc).add((String) file);
+                           return acc;
+                         }),
         OutputChannels.none()
     );
   }
 
+  @Nullable
+  @Override
+  public TypeReference<Object> getResultTypeReference()
+  {
+    return new TypeReference<Object>() {};

Review Comment:
   ```suggestion
       return new TypeReference<List<String>>() {};
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java:
##########
@@ -127,11 +139,46 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
     );
 
     return new ProcessorsAndChannels<>(
-        ProcessorManagers.of(processors),
+        ProcessorManagers.of(processors)
+                         .withAccumulation(new ArrayList<String>(), (acc, 
file) -> {
+                           ((ArrayList<String>) acc).add((String) file);
+                           return acc;
+                         }),
         OutputChannels.none()
     );
   }
 
+  @Nullable
+  @Override
+  public TypeReference<Object> getResultTypeReference()
+  {
+    return new TypeReference<Object>() {};
+  }
+
+  @Override
+  public Object mergeAccumulatedResult(Object accumulated, Object 
otherAccumulated)
+  {
+    // Maintain upgrade compatibility, if a worker does not return a list, 
ignore it.

Review Comment:
   Can we end up in a state where some files are present and some are not? What 
is to say that there are files absent? I think if the manifest file is for 
cosmetic purposes only, we can get away with partial results, but it should be 
explicitly called out in the documentation. 
   
   Since it's in a widely used format, the expectation seems to be that the 
users would be able to ingest the results in other systems, like:
   Druid -> Export destination -> External system
   
   This will lead to incorrect results, without the users knowing. I think we 
should fail the job here, and if not, then this seems like the usage of the 
manifest file is limited by its correctness and should be called out 
explicitly. We can also have a context parameter (which I am not a big fan of), 
that can switch between error-throwing and ignoring behaviour, for those who 
want correctness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to