nfsantos commented on code in PR #728:
URL: https://github.com/apache/jackrabbit-oak/pull/728#discussion_r989211860


##########
oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Generic concurrent file downloader which uses Java NIO channels to 
potentially leverage OS internal optimizations.
+ */
+public class Downloader implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Downloader.class);
+
+    private final ExecutorService executorService;
+
+    public Downloader(int concurrency) {
+        LOG.info("Initializing Downloader with max number of concurrent 
requests={}", concurrency);
+        this.executorService = new ThreadPoolExecutor(
+                (int) Math.ceil(concurrency * .1), concurrency, 60L, 
TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new ThreadFactoryBuilder()
+                        .setNameFormat("downloader-%d")
+                        .setDaemon(true)
+                        .build()
+        );
+    }
+
+    public List<ItemResponse> download(List<Item> items) {
+        LOG.debug("Preparing to download {} items.\n{}", items.size(), items);
+        try {
+            return executorService
+                    
.invokeAll(items.stream().map(DownloadWorker::new).collect(Collectors.toList()))
+                    .stream()
+                    .map(itemResponseFuture -> {
+                        try {
+                            return itemResponseFuture.get();
+                        } catch (InterruptedException | ExecutionException e) {
+                            throw new RuntimeException(e);
+                        }
+                    })
+                    .collect(Collectors.toList());
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        executorService.shutdown();
+    }
+
+    public static class Item {
+        public String source;
+        public String destination;
+
+        @Override
+        public String toString() {
+            return "Item{" +
+                    "source='" + source + '\'' +
+                    ", destination='" + destination + '\'' +
+                    '}';
+        }
+    }
+
+    public static class ItemResponse {
+        public final Item item;
+        public boolean failed;
+        public long size;
+        public long time;
+        public Throwable throwable;
+
+        public ItemResponse(Item item) {
+            this.item = item;
+        }
+    }
+
+    private static class DownloadWorker implements Callable<ItemResponse> {
+
+        private final Item item;
+
+        DownloadWorker(Item item) {
+            this.item = item;
+        }
+
+        @Override
+        public ItemResponse call() {
+            ItemResponse response  = new ItemResponse(item);
+            long t0 = System.currentTimeMillis();
+            try {
+                URL sourceUrl = new URL(item.source);
+                File destinationFile = new File(item.destination);
+                destinationFile.getParentFile().mkdirs();
+                try (ReadableByteChannel byteChannel = 
Channels.newChannel(sourceUrl.openStream());
+                     FileOutputStream outputStream = new 
FileOutputStream(destinationFile)) {
+                    response.size = outputStream.getChannel()
+                            .transferFrom(byteChannel, 0, Long.MAX_VALUE);

Review Comment:
   I don't think this will do a direct copy. The `byteChannel` here is an 
instance of `ReadableByteChannelImpl` (created by `Channels.newChannel`), which 
takes a generic InputStream, so has no knowledge of the implementation details 
of this input stream. The `transferFrom` method cannot guess that this 
inputstream is backed by a socket, so it will likely revert to copying the data 
using a buffer in the JVM heap. Here is the  source code for 
`FileChannelImpl#transferFrom` (in JDK 19):
   
   ```
           if (src instanceof FileChannelImpl fci) {
               long n;
               if ((n = transferFromDirectly(fci, position, count)) >= 0)
                   return n;
               if ((n = transferFromFileChannel(fci, position, count)) >= 0)
                   return n;
           }
   
           return transferFromArbitraryChannel(src, position, count);
   ```
   
    So it will fall to `transferFromArbitraryChannel`, which just does a normal 
copy. 
   
   I found examples of doing a direct copy from a SocketChannel, but I'm not 
sure if it is possible to open a SocketChannel from an URL connection.
   
   Otherwise, if we do not use direct copy, it may be better to wrap the input 
and output streams in buffers, that may speed up the download.



##########
oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.StandardSystemProperty.FILE_SEPARATOR;
+
+/**
+ * Command to concurrently download blobs from an azure datastore using sas 
token authentication.
+ * <p>
+ * Blobs are stored in a specific folder following the datastore structure 
format.
+ */
+public class DataStoreCopyCommand implements Command {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataStoreCopyCommand.class);
+
+    private String sourceRepo;
+    private String includePath;
+    private File fileIncludePath;
+    private String sasToken;
+    private String outDir;
+    private int concurrency;
+
+    @Override
+    public void execute(String... args) throws Exception {
+        parseCommandLineParams(args);
+
+        List<String> ids;
+        if (fileIncludePath != null) {
+            if (fileIncludePath.isDirectory() || !fileIncludePath.exists()) {
+                throw new IllegalArgumentException("file-include-path must be 
a valid file");
+            }
+            ids = 
org.apache.commons.io.IOUtils.readLines(Files.newInputStream(fileIncludePath.toPath()),
 StandardCharsets.UTF_8);
+        } else {
+            ids = 
Arrays.stream(includePath.split(";")).collect(Collectors.toList());
+        }
+        if (ids.isEmpty()) {
+            throw new IllegalArgumentException("Blob ids must be specified");
+        }
+
+        try (Downloader downloader = new Downloader(concurrency)) {
+            long start = System.currentTimeMillis();

Review Comment:
   In rare cases, using `System.currentTimeMillis()` to measure a duration of 
time may produce wrong results. If the system clock is changed or adjusted 
between two calls to `currentTimeMillis()`, the time measured will deviate from 
the real time elapsed between the two calls. `System.nanoTime()` is a safer 
option.    
   
   
https://www.javaadvent.com/2019/12/measuring-time-from-java-to-kernel-and-back.html#:~:text=2.%20CLOCK%20MONOTONICITY,2%20different%20JVMs.



##########
oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Generic concurrent file downloader which uses Java NIO channels to 
potentially leverage OS internal optimizations.
+ */
+public class Downloader implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Downloader.class);
+
+    private final ExecutorService executorService;
+
+    public Downloader(int concurrency) {
+        LOG.info("Initializing Downloader with max number of concurrent 
requests={}", concurrency);
+        this.executorService = new ThreadPoolExecutor(
+                (int) Math.ceil(concurrency * .1), concurrency, 60L, 
TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new ThreadFactoryBuilder()
+                        .setNameFormat("downloader-%d")
+                        .setDaemon(true)
+                        .build()
+        );
+    }
+
+    public List<ItemResponse> download(List<Item> items) {
+        LOG.debug("Preparing to download {} items.\n{}", items.size(), items);
+        try {
+            return executorService
+                    
.invokeAll(items.stream().map(DownloadWorker::new).collect(Collectors.toList()))
+                    .stream()
+                    .map(itemResponseFuture -> {
+                        try {
+                            return itemResponseFuture.get();
+                        } catch (InterruptedException | ExecutionException e) {
+                            throw new RuntimeException(e);
+                        }
+                    })
+                    .collect(Collectors.toList());
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        executorService.shutdown();
+    }
+
+    public static class Item {
+        public String source;
+        public String destination;
+
+        @Override
+        public String toString() {
+            return "Item{" +
+                    "source='" + source + '\'' +
+                    ", destination='" + destination + '\'' +
+                    '}';
+        }
+    }
+
+    public static class ItemResponse {
+        public final Item item;
+        public boolean failed;
+        public long size;
+        public long time;
+        public Throwable throwable;
+
+        public ItemResponse(Item item) {
+            this.item = item;
+        }
+    }
+
+    private static class DownloadWorker implements Callable<ItemResponse> {
+
+        private final Item item;
+
+        DownloadWorker(Item item) {
+            this.item = item;
+        }
+
+        @Override
+        public ItemResponse call() {
+            ItemResponse response  = new ItemResponse(item);
+            long t0 = System.currentTimeMillis();

Review Comment:
   Same issue with `currentTimeMillis()` as above.



##########
oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Generic concurrent file downloader which uses Java NIO channels to 
potentially leverage OS internal optimizations.
+ */
+public class Downloader implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Downloader.class);
+
+    private final ExecutorService executorService;
+
+    public Downloader(int concurrency) {
+        LOG.info("Initializing Downloader with max number of concurrent 
requests={}", concurrency);
+        this.executorService = new ThreadPoolExecutor(
+                (int) Math.ceil(concurrency * .1), concurrency, 60L, 
TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new ThreadFactoryBuilder()
+                        .setNameFormat("downloader-%d")
+                        .setDaemon(true)
+                        .build()
+        );
+    }
+
+    public List<ItemResponse> download(List<Item> items) {
+        LOG.debug("Preparing to download {} items.\n{}", items.size(), items);
+        try {
+            return executorService
+                    
.invokeAll(items.stream().map(DownloadWorker::new).collect(Collectors.toList()))
+                    .stream()
+                    .map(itemResponseFuture -> {
+                        try {
+                            return itemResponseFuture.get();
+                        } catch (InterruptedException | ExecutionException e) {
+                            throw new RuntimeException(e);
+                        }
+                    })
+                    .collect(Collectors.toList());
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        executorService.shutdown();
+    }
+
+    public static class Item {
+        public String source;
+        public String destination;
+
+        @Override
+        public String toString() {
+            return "Item{" +
+                    "source='" + source + '\'' +
+                    ", destination='" + destination + '\'' +
+                    '}';
+        }
+    }
+
+    public static class ItemResponse {
+        public final Item item;
+        public boolean failed;
+        public long size;
+        public long time;
+        public Throwable throwable;
+
+        public ItemResponse(Item item) {
+            this.item = item;
+        }
+    }
+
+    private static class DownloadWorker implements Callable<ItemResponse> {
+
+        private final Item item;
+
+        DownloadWorker(Item item) {
+            this.item = item;
+        }
+
+        @Override
+        public ItemResponse call() {
+            ItemResponse response  = new ItemResponse(item);
+            long t0 = System.currentTimeMillis();
+            try {
+                URL sourceUrl = new URL(item.source);
+                File destinationFile = new File(item.destination);
+                destinationFile.getParentFile().mkdirs();

Review Comment:
   Check the return value of `mkdirs()` to ensure that the directories were 
created successfully and handle the error otherwise.



##########
oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Generic concurrent file downloader which uses Java NIO channels to 
potentially leverage OS internal optimizations.
+ */
+public class Downloader implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Downloader.class);
+
+    private final ExecutorService executorService;
+
+    public Downloader(int concurrency) {
+        LOG.info("Initializing Downloader with max number of concurrent 
requests={}", concurrency);
+        this.executorService = new ThreadPoolExecutor(
+                (int) Math.ceil(concurrency * .1), concurrency, 60L, 
TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new ThreadFactoryBuilder()
+                        .setNameFormat("downloader-%d")
+                        .setDaemon(true)
+                        .build()
+        );
+    }
+
+    public List<ItemResponse> download(List<Item> items) {
+        LOG.debug("Preparing to download {} items.\n{}", items.size(), items);
+        try {
+            return executorService
+                    
.invokeAll(items.stream().map(DownloadWorker::new).collect(Collectors.toList()))
+                    .stream()
+                    .map(itemResponseFuture -> {
+                        try {
+                            return itemResponseFuture.get();
+                        } catch (InterruptedException | ExecutionException e) {
+                            throw new RuntimeException(e);
+                        }
+                    })
+                    .collect(Collectors.toList());
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        executorService.shutdown();
+    }
+
+    public static class Item {
+        public String source;
+        public String destination;
+
+        @Override
+        public String toString() {
+            return "Item{" +
+                    "source='" + source + '\'' +
+                    ", destination='" + destination + '\'' +
+                    '}';
+        }
+    }
+
+    public static class ItemResponse {
+        public final Item item;
+        public boolean failed;
+        public long size;
+        public long time;
+        public Throwable throwable;
+
+        public ItemResponse(Item item) {
+            this.item = item;
+        }
+    }
+
+    private static class DownloadWorker implements Callable<ItemResponse> {
+
+        private final Item item;
+
+        DownloadWorker(Item item) {
+            this.item = item;
+        }
+
+        @Override
+        public ItemResponse call() {
+            ItemResponse response  = new ItemResponse(item);
+            long t0 = System.currentTimeMillis();
+            try {
+                URL sourceUrl = new URL(item.source);
+                File destinationFile = new File(item.destination);
+                destinationFile.getParentFile().mkdirs();
+                try (ReadableByteChannel byteChannel = 
Channels.newChannel(sourceUrl.openStream());

Review Comment:
   What is the connect and read timeout? Do we want to configure those 
explicitly? For inter region transfers from the Azure blobstore, the response 
should be always under a few hundred milliseconds. So probably the timeouts 
should be relatively short. I'm not sure what are the default timeouts, but I 
have the idea that they are quite high. I saw somewhere 15s for connect, 60s 
for read. This may be excessive in this case. 



##########
oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Generic concurrent file downloader which uses Java NIO channels to 
potentially leverage OS internal optimizations.
+ */
+public class Downloader implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Downloader.class);
+
+    private final ExecutorService executorService;
+
+    public Downloader(int concurrency) {

Review Comment:
   Check that it is positive. And maybe also set a reasonable max value, like 
500 or 1000, to avoid a misconfiguration from killing the machine.



##########
oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.StandardSystemProperty.FILE_SEPARATOR;
+
+/**
+ * Command to concurrently download blobs from an azure datastore using sas 
token authentication.
+ * <p>
+ * Blobs are stored in a specific folder following the datastore structure 
format.
+ */
+public class DataStoreCopyCommand implements Command {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataStoreCopyCommand.class);
+
+    private String sourceRepo;
+    private String includePath;
+    private File fileIncludePath;
+    private String sasToken;
+    private String outDir;
+    private int concurrency;
+
+    @Override
+    public void execute(String... args) throws Exception {
+        parseCommandLineParams(args);
+
+        List<String> ids;
+        if (fileIncludePath != null) {
+            if (fileIncludePath.isDirectory() || !fileIncludePath.exists()) {
+                throw new IllegalArgumentException("file-include-path must be 
a valid file");
+            }
+            ids = 
org.apache.commons.io.IOUtils.readLines(Files.newInputStream(fileIncludePath.toPath()),
 StandardCharsets.UTF_8);
+        } else {
+            ids = 
Arrays.stream(includePath.split(";")).collect(Collectors.toList());
+        }
+        if (ids.isEmpty()) {
+            throw new IllegalArgumentException("Blob ids must be specified");
+        }
+
+        try (Downloader downloader = new Downloader(concurrency)) {
+            long start = System.currentTimeMillis();
+            List<Downloader.ItemResponse> responses = 
downloader.download(ids.stream().map(id -> {
+                Downloader.Item item = new Downloader.Item();
+                item.source = sourceRepo + "/" + id;
+                if (sasToken != null) {
+                    item.source += "?" + sasToken;
+                }
+                // Rename the blob names to match expected datastore cache 
format (remove the "-" in the name)
+                String blobName = id.replaceAll("-", "");
+                if (id.length() < 6) {
+                    LOG.warn("Blob with name {} is less than 6 chars. Cannot 
create data folder structure. Storing in the root folder", blobName);
+                    item.destination = outDir + FILE_SEPARATOR.value() + 
blobName;
+                } else {
+                    item.destination = outDir + FILE_SEPARATOR.value()
+                            + blobName.substring(0, 2) + 
FILE_SEPARATOR.value() + blobName.substring(2, 4) + FILE_SEPARATOR.value()
+                            + blobName.substring(4, 6) + 
FILE_SEPARATOR.value() + blobName;
+                }
+                return item;
+            }).collect(Collectors.toList()));
+            long totalTime = System.currentTimeMillis() - start;
+
+            Map<Boolean, List<Downloader.ItemResponse>> partitioned =
+                    responses.stream().collect(Collectors.partitioningBy(ir -> 
ir.failed));
+
+            List<Downloader.ItemResponse> success = partitioned.get(false);
+            if (!success.isEmpty()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("The following blobs were successfully 
downloaded:");
+                    success.forEach(s -> LOG.debug("{} [{}] downloaded in {} 
ms", s.item.source,
+                            IOUtils.humanReadableByteCount(s.size), s.time));
+                }
+
+                long totalBytes = success.stream().mapToLong(s -> 
s.size).sum();
+                if (totalTime > 60000) {
+                    LOG.info("Elapsed Time (Minutes): {}", 
TimeUnit.MILLISECONDS.toMinutes(totalTime));
+                } else {
+                    LOG.info("Elapsed Time (Seconds): {}", 
TimeUnit.MILLISECONDS.toSeconds(totalTime));
+                }
+                LOG.info("Number of File Transfers: {}", success.size());
+                LOG.info("TotalBytesTransferred: {}[{}]", totalBytes, 
IOUtils.humanReadableByteCount(totalBytes));
+                if (totalBytes > 10_000_000) {
+                    LOG.info("Speed (MB/sec): {}", (totalBytes / (1024 * 1024) 
/ ((double) totalTime / 1000)));
+                } else {
+                    LOG.info("Speed (KB/sec): {}", (totalBytes / (1024) / 
((double) totalTime / 1000)));
+                }
+            }
+
+            List<Downloader.ItemResponse> failures = partitioned.get(true);
+            if (!failures.isEmpty()) {
+                LOG.error("The following blobs threw an error:");
+                failures.forEach(f -> LOG.error(f.item.source, f.throwable));
+                throw new IllegalStateException("Errors while downloading 
blobs");
+            }
+        }
+    }
+
+    private void parseCommandLineParams(String... args) {
+        OptionParser parser = new OptionParser();
+
+        // options available for get-blobs only
+        OptionSpec<String> sourceRepoOpt = parser.accepts("source-repo", "The 
source repository url")
+                .withRequiredArg().ofType(String.class).required();
+
+        OptionSpecBuilder includePathBuilder = parser.accepts("include-path",
+                "Include only these paths when copying (separated by 
semicolon)");
+        OptionSpecBuilder fileIncludePathBuilder = 
parser.accepts("file-include-path",
+                "Include only the paths specified in the file (separated by 
newline)");
+        parser.mutuallyExclusive(includePathBuilder, fileIncludePathBuilder);
+        OptionSpec<String> includePathOpt = 
includePathBuilder.withRequiredArg().ofType(String.class);
+        OptionSpec<File> fileIncludePathOpt = 
fileIncludePathBuilder.withRequiredArg().ofType(File.class);

Review Comment:
   Is it possible to use `Path` instead of `File`? Path is newer and has a 
cleaner API. The code above even has to convert the File to Path, so this could 
be avoided. 



##########
oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.StandardSystemProperty.FILE_SEPARATOR;
+
+/**
+ * Command to concurrently download blobs from an azure datastore using sas 
token authentication.
+ * <p>
+ * Blobs are stored in a specific folder following the datastore structure 
format.
+ */
+public class DataStoreCopyCommand implements Command {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataStoreCopyCommand.class);
+
+    private String sourceRepo;
+    private String includePath;
+    private File fileIncludePath;
+    private String sasToken;
+    private String outDir;
+    private int concurrency;
+
+    @Override
+    public void execute(String... args) throws Exception {
+        parseCommandLineParams(args);
+
+        List<String> ids;
+        if (fileIncludePath != null) {
+            if (fileIncludePath.isDirectory() || !fileIncludePath.exists()) {
+                throw new IllegalArgumentException("file-include-path must be 
a valid file");
+            }
+            ids = 
org.apache.commons.io.IOUtils.readLines(Files.newInputStream(fileIncludePath.toPath()),
 StandardCharsets.UTF_8);

Review Comment:
   `Files.readAllLines()` does the same, avoids using a 3rd party library.



##########
oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.StandardSystemProperty.FILE_SEPARATOR;
+
+/**
+ * Command to concurrently download blobs from an azure datastore using sas 
token authentication.
+ * <p>
+ * Blobs are stored in a specific folder following the datastore structure 
format.
+ */
+public class DataStoreCopyCommand implements Command {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataStoreCopyCommand.class);
+
+    private String sourceRepo;
+    private String includePath;
+    private File fileIncludePath;
+    private String sasToken;
+    private String outDir;
+    private int concurrency;
+
+    @Override
+    public void execute(String... args) throws Exception {
+        parseCommandLineParams(args);
+
+        List<String> ids;
+        if (fileIncludePath != null) {
+            if (fileIncludePath.isDirectory() || !fileIncludePath.exists()) {
+                throw new IllegalArgumentException("file-include-path must be 
a valid file");
+            }
+            ids = 
org.apache.commons.io.IOUtils.readLines(Files.newInputStream(fileIncludePath.toPath()),
 StandardCharsets.UTF_8);
+        } else {
+            ids = 
Arrays.stream(includePath.split(";")).collect(Collectors.toList());
+        }
+        if (ids.isEmpty()) {
+            throw new IllegalArgumentException("Blob ids must be specified");
+        }
+
+        try (Downloader downloader = new Downloader(concurrency)) {
+            long start = System.currentTimeMillis();
+            List<Downloader.ItemResponse> responses = 
downloader.download(ids.stream().map(id -> {

Review Comment:
   Before calling the download method and starting any download, this code is 
going through all the files that have to be transferred and preparing them for 
download. Then once the list is built, the method `download`, it is once again 
traversing the full list of file to create a new list of `DownloadWorker`. This 
last traversal is relatively light (just creates a new object), but the first 
pass done here can be slow, as it can potentially write a log statement. 
   
   Maybe this is not an issue in practice, but I think it would be more 
efficient to avoid building these intermediate lists, and instead pass the 
blobs to download one by one to the downloader. This would require a new method 
`download(Item)`, which would just add a new task to the executor for this blob.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to