This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new de5a4bafcb Zero-copy local deep storage. (#13394)
de5a4bafcb is described below

commit de5a4bafcbad7c05abd9ca528ab157e6a121dbbf
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Dec 12 17:28:24 2022 -0800

    Zero-copy local deep storage. (#13394)
    
    * Zero-copy local deep storage.
    
    This is useful for local deep storage, since it reduces disk usage and
    makes Historicals able to load segments instantaneously.
    
    Two changes:
    
    1) Introduce "druid.storage.zip" parameter for local storage, which defaults
       to false. This changes default behavior from writing an index.zip to 
writing
       a regular directory. This is safe to do even during a rolling update, 
because
       the older code actually already handled unzipped directories being 
present
       on local deep storage.
    
    2) In LocalDataSegmentPuller and LocalDataSegmentPusher, use hard links
       instead of copies when possible. (Generally this is possible when the
       source and destination directory are on the same filesystem.)
---
 .../apache/druid/java/util/common/FileUtils.java   |  26 ++++
 .../druid/java/util/common/FileUtilsTest.java      |  41 +++++++
 docs/dependencies/deep-storage.md                  |  45 +++++--
 .../druid/storage/hdfs/HdfsDataSegmentPusher.java  |   2 -
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   1 +
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   1 +
 .../druid/indexing/overlord/TaskLifecycleTest.java |   5 +-
 .../segment/loading/LocalDataSegmentKiller.java    |  15 ++-
 .../segment/loading/LocalDataSegmentPuller.java    |  68 ++++++-----
 .../segment/loading/LocalDataSegmentPusher.java    | 106 +++++++++++++----
 .../loading/LocalDataSegmentPusherConfig.java      |   8 ++
 .../loading/LocalDataSegmentKillerTest.java        |  88 +++++++++++---
 .../loading/LocalDataSegmentPusherTest.java        | 132 +++++++++++++++++++--
 website/.spelling                                  |   2 +
 14 files changed, 440 insertions(+), 100 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java 
b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
index 2ca3944d84..d2c7a6d1a9 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
@@ -53,6 +53,12 @@ import java.util.UUID;
 
 public class FileUtils
 {
+  public enum LinkOrCopyResult
+  {
+    LINK,
+    COPY
+  }
+
   /**
    * Useful for retry functionality that doesn't want to stop Throwables, but 
does want to retry on Exceptions
    */
@@ -461,6 +467,26 @@ public class FileUtils
     org.apache.commons.io.FileUtils.deleteDirectory(directory);
   }
 
+  /**
+   * Hard-link "src" as "dest", if possible. If not possible -- perhaps they 
are on separate filesystems -- then
+   * copy "src" to "dest".
+   *
+   * @return whether a link or copy was made. Can be safely ignored if you 
don't care.
+   *
+   * @throws IOException if something went wrong
+   */
+  public static LinkOrCopyResult linkOrCopy(final File src, final File dest) 
throws IOException
+  {
+    try {
+      Files.createLink(dest.toPath(), src.toPath());
+      return LinkOrCopyResult.LINK;
+    }
+    catch (IOException e) {
+      Files.copy(src.toPath(), dest.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+      return LinkOrCopyResult.COPY;
+    }
+  }
+
   public interface OutputStreamConsumer<T>
   {
     T apply(OutputStream outputStream) throws IOException;
diff --git 
a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java 
b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
index f76f2e53d2..86909a2621 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
@@ -244,4 +244,45 @@ public class FileUtilsTest
     Assert.assertEquals(data.length(), result);
     Assert.assertEquals(data, 
StringUtils.fromUtf8(Files.readAllBytes(dstFile.toPath())));
   }
+
+  @Test
+  public void testLinkOrCopy1() throws IOException
+  {
+    // Will be a LINK.
+
+    final File fromFile = temporaryFolder.newFile();
+    final File toDir = temporaryFolder.newFolder();
+    final File toFile = new File(toDir, "toFile");
+
+    Files.write(fromFile.toPath(), StringUtils.toUtf8("foo"));
+    final FileUtils.LinkOrCopyResult linkOrCopyResult = 
FileUtils.linkOrCopy(fromFile, toFile);
+
+    // Verify the new link.
+    Assert.assertEquals(FileUtils.LinkOrCopyResult.LINK, linkOrCopyResult);
+    Assert.assertEquals("foo", 
StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
+
+    // Verify they are actually the same file.
+    Files.write(fromFile.toPath(), StringUtils.toUtf8("bar"));
+    Assert.assertEquals("bar", 
StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
+  }
+
+  @Test
+  public void testLinkOrCopy2() throws IOException
+  {
+    // Will be a COPY, because the destination file already exists and 
therefore Files.createLink fails.
+
+    final File fromFile = temporaryFolder.newFile();
+    final File toFile = temporaryFolder.newFile();
+
+    Files.write(fromFile.toPath(), StringUtils.toUtf8("foo"));
+    final FileUtils.LinkOrCopyResult linkOrCopyResult = 
FileUtils.linkOrCopy(fromFile, toFile);
+
+    // Verify the new link.
+    Assert.assertEquals(FileUtils.LinkOrCopyResult.COPY, linkOrCopyResult);
+    Assert.assertEquals("foo", 
StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
+
+    // Verify they are not the same file.
+    Files.write(fromFile.toPath(), StringUtils.toUtf8("bar"));
+    Assert.assertEquals("foo", 
StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
+  }
 }
diff --git a/docs/dependencies/deep-storage.md 
b/docs/dependencies/deep-storage.md
index 77ae9b27da..b63f968bf5 100644
--- a/docs/dependencies/deep-storage.md
+++ b/docs/dependencies/deep-storage.md
@@ -25,29 +25,52 @@ title: "Deep storage"
 
 Deep storage is where segments are stored.  It is a storage mechanism that 
Apache Druid does not provide.  This deep storage infrastructure defines the 
level of durability of your data, as long as Druid processes can see this 
storage infrastructure and get at the segments stored on it, you will not lose 
data no matter how many Druid nodes you lose.  If segments disappear from this 
storage layer, then you will lose whatever data those segments represented.
 
-## Local Mount
+## Local
 
-A local mount can be used for storage of segments as well.  This allows you to 
use just your local file system or anything else that can be mount locally like 
NFS, Ceph, etc.  This is the default deep storage implementation.
+Local storage is intended for use in the following situations:
 
-In order to use a local mount for deep storage, you need to set the following 
configuration in your common configs.
+- You have just one server.
+- Or, you have multiple servers, and they all have access to a shared 
filesystem (for example: NFS).
+
+In multi-server production clusters, rather than local storage with a shared 
filesystem, it is instead recommended to
+use cloud-based deep storage ([Amazon S3](#amazon-s3-or-s3-compatible), 
[Google Cloud Storage](#google-cloud-storage),
+or [Azure Blob Storage](#azure-blob-storage)), S3-compatible storage (like 
Minio), or [HDFS](#hdfs). These options are
+generally more convenient, more scalable, and more robust than setting up a 
shared filesystem.
+
+The following configurations in `common.runtime.properties` apply to local 
storage:
 
 |Property|Possible Values|Description|Default|
 |--------|---------------|-----------|-------|
-|`druid.storage.type`|local||Must be set.|
-|`druid.storage.storageDirectory`||Directory for storing segments.|Must be 
set.|
+|`druid.storage.type`|`local`||Must be set.|
+|`druid.storage.storageDirectory`|any local directory|Directory for storing 
segments. Must be different from `druid.segmentCache.locations` and 
`druid.segmentCache.infoDir`.|`/tmp/druid/localStorage`|
+|`druid.storage.zip`|`true`, `false`|Whether segments in 
`druid.storage.storageDirectory` are written as directories (`false`) or zip 
files (`true`).|`false`|
+
+For example:
+
+```
+druid.storage.type=local
+druid.storage.storageDirectory=/tmp/druid/localStorage
+```
+
+The `druid.storage.storageDirectory` must be set to a different path than 
`druid.segmentCache.locations` or
+`druid.segmentCache.infoDir`.
+
+## Amazon S3 or S3-compatible
+
+See [`druid-s3-extensions`](../development/extensions-core/s3.md).
 
-Note that you should generally set `druid.storage.storageDirectory` to 
something different from `druid.segmentCache.locations` and 
`druid.segmentCache.infoDir`.
+## Google Cloud Storage
 
-If you are using the Hadoop indexer in local mode, then just give it a local 
file as your output directory and it will work.
+See [`druid-google-extensions`](../development/extensions-core/google.md).
 
-## S3-compatible
+## Azure Blob Storage
 
-See [druid-s3-extensions extension 
documentation](../development/extensions-core/s3.md).
+See [`druid-azure-extensions`](../development/extensions-core/azure.md).
 
 ## HDFS
 
 See [druid-hdfs-storage extension 
documentation](../development/extensions-core/hdfs.md).
 
-## Additional Deep Stores
+## Additional options
 
-For additional deep stores, please see our [extensions 
list](../development/extensions.md).
+For additional deep storage options, please see our [extensions 
list](../development/extensions.md).
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
index e262b40da7..280483c11d 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
@@ -54,7 +54,6 @@ public class HdfsDataSegmentPusher implements 
DataSegmentPusher
   private static final Logger log = new Logger(HdfsDataSegmentPusher.class);
 
   private final Configuration hadoopConfig;
-  private final ObjectMapper jsonMapper;
 
   // We lazily initialize fullQualifiedStorageDirectory to avoid potential 
issues with Hadoop namenode HA.
   // Please see https://github.com/apache/druid/pull/5684
@@ -68,7 +67,6 @@ public class HdfsDataSegmentPusher implements 
DataSegmentPusher
   )
   {
     this.hadoopConfig = hadoopConfig;
-    this.jsonMapper = jsonMapper;
     Path storageDir = new Path(config.getStorageDirectory());
     this.fullyQualifiedStorageDirectory = Suppliers.memoize(
         () -> {
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index a83bbaf515..3a9000c3d6 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -3159,6 +3159,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     };
     final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new 
LocalDataSegmentPusherConfig();
     dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
+    dataSegmentPusherConfig.zip = true;
     final DataSegmentPusher dataSegmentPusher = new 
LocalDataSegmentPusher(dataSegmentPusherConfig);
 
     toolboxFactory = new TaskToolboxFactory(
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 553b601f7f..99f939433f 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -3124,6 +3124,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     };
     final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new 
LocalDataSegmentPusherConfig();
     dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
+    dataSegmentPusherConfig.zip = true;
     final DataSegmentPusher dataSegmentPusher = new 
LocalDataSegmentPusher(dataSegmentPusherConfig);
 
     toolboxFactory = new TaskToolboxFactory(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 443f2c6d1e..fd2b68ef39 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.client.cache.MapCache;
@@ -162,6 +163,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -943,7 +945,8 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
     List<File> segmentFiles = new ArrayList<>();
     for (DataSegment segment : 
mdc.retrieveUnusedSegmentsForInterval("test_kill_task", 
Intervals.of("2011-04-01/P4D"))) {
       File file = new File((String) segment.getLoadSpec().get("path"));
-      FileUtils.mkdirp(file);
+      FileUtils.mkdirp(file.getParentFile());
+      Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
       segmentFiles.add(file);
     }
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java
 
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java
index f7156073cf..923467c65b 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java
@@ -29,6 +29,7 @@ import java.io.File;
 import java.io.IOException;
 
 /**
+ *
  */
 public class LocalDataSegmentKiller implements DataSegmentKiller
 {
@@ -51,9 +52,12 @@ public class LocalDataSegmentKiller implements 
DataSegmentKiller
     log.info("Deleting segment[%s] from directory[%s].", segment.getId(), 
path);
 
     try {
-      if (path.getName().endsWith(".zip")) {
+      if ((path.getName().endsWith(".zip") && path.isFile()) ||
+          (path.getName().equals(LocalDataSegmentPusher.INDEX_DIR) && 
path.isDirectory())) {
         // path format -- > 
.../dataSource/interval/version/partitionNum/xxx.zip
         // or .../dataSource/interval/version/partitionNum/UUID/xxx.zip
+        // or -- > .../dataSource/interval/version/partitionNum/index/
+        // or .../dataSource/interval/version/partitionNum/UUID/index/
 
         File parentDir = path.getParentFile();
         FileUtils.deleteDirectory(parentDir);
@@ -62,13 +66,18 @@ public class LocalDataSegmentKiller implements 
DataSegmentKiller
         parentDir = parentDir.getParentFile();
         int maxDepth = 4; // if for some reason there's no datasSource 
directory, stop recursing somewhere reasonable
         while (parentDir != null && --maxDepth >= 0) {
-          if (!parentDir.delete() || 
segment.getDataSource().equals(parentDir.getName())) {
+          // parentDir.listFiles().length > 0 check not strictly necessary, 
because parentDir.delete() fails on
+          // nonempty directories. However, including it here is nice since it 
makes our intent very clear (only
+          // remove nonempty directories) and it prevents making delete 
syscalls that are doomed to failure.
+          if (parentDir.listFiles().length > 0
+              || !parentDir.delete()
+              || segment.getDataSource().equals(parentDir.getName())) {
             break;
           }
 
           parentDir = parentDir.getParentFile();
         }
-      } else {
+      } else if (path.exists()) {
         throw new SegmentLoadingException("Unknown file type[%s]", path);
       }
     }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
 
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
index 5f63557f98..0f27dac9e1 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
@@ -43,6 +43,7 @@ import java.net.URI;
 import java.nio.charset.Charset;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CancellationException;
 
@@ -125,40 +126,43 @@ public class LocalDataSegmentPuller implements 
URIDataPuller
   public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final 
File dir) throws SegmentLoadingException
   {
     if (sourceFile.isDirectory()) {
-      if (sourceFile.equals(dir)) {
-        log.info("Asked to load [%s] into itself, done!", dir);
-        return new FileUtils.FileCopyResult(sourceFile);
-      }
+      try {
+        final File[] files = sourceFile.listFiles();
+        if (files == null) {
+          throw new SegmentLoadingException("No files found in [%s]", 
sourceFile.getAbsolutePath());
+        }
 
-      final File[] files = sourceFile.listFiles();
-      if (files == null) {
-        throw new SegmentLoadingException("No files found in [%s]", 
sourceFile.getAbsolutePath());
-      }
-      final FileUtils.FileCopyResult result = new 
FileUtils.FileCopyResult(sourceFile);
-      for (final File oldFile : files) {
-        if (oldFile.isDirectory()) {
-          log.info("[%s] is a child directory, skipping", 
oldFile.getAbsolutePath());
-          continue;
+        if (sourceFile.equals(dir)) {
+          log.info("Asked to load [%s] into itself, done!", dir);
+          return new FileUtils.FileCopyResult(Arrays.asList(files));
         }
 
-        result.addFiles(
-            FileUtils.retryCopy(
-                Files.asByteSource(oldFile),
-                new File(dir, oldFile.getName()),
-                shouldRetryPredicate(),
-                DEFAULT_RETRY_COUNT
-            ).getFiles()
+        final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
+        boolean link = true;
+        for (final File oldFile : files) {
+          if (oldFile.isDirectory()) {
+            log.info("[%s] is a child directory, skipping", 
oldFile.getAbsolutePath());
+            continue;
+          }
+
+          final File newFile = new File(dir, oldFile.getName());
+          final FileUtils.LinkOrCopyResult linkOrCopyResult = 
FileUtils.linkOrCopy(oldFile, newFile);
+          link = link && linkOrCopyResult == FileUtils.LinkOrCopyResult.LINK;
+          result.addFile(newFile);
+        }
+        log.info(
+            "%s %d bytes from [%s] to [%s]",
+            link ? "Linked" : "Copied",
+            result.size(),
+            sourceFile.getAbsolutePath(),
+            dir.getAbsolutePath()
         );
+        return result;
       }
-      log.info(
-          "Copied %d bytes from [%s] to [%s]",
-          result.size(),
-          sourceFile.getAbsolutePath(),
-          dir.getAbsolutePath()
-      );
-      return result;
-    }
-    if (CompressionUtils.isZip(sourceFile.getName())) {
+      catch (IOException e) {
+        throw new SegmentLoadingException(e, "Unable to load from local 
directory [%s]", sourceFile.getAbsolutePath());
+      }
+    } else if (CompressionUtils.isZip(sourceFile.getName())) {
       try {
         final FileUtils.FileCopyResult result = CompressionUtils.unzip(
             Files.asByteSource(sourceFile),
@@ -177,8 +181,7 @@ public class LocalDataSegmentPuller implements URIDataPuller
       catch (IOException e) {
         throw new SegmentLoadingException(e, "Unable to unzip file [%s]", 
sourceFile.getAbsolutePath());
       }
-    }
-    if (CompressionUtils.isGz(sourceFile.getName())) {
+    } else if (CompressionUtils.isGz(sourceFile.getName())) {
       final File outFile = new File(dir, 
CompressionUtils.getGzBaseName(sourceFile.getName()));
       final FileUtils.FileCopyResult result = CompressionUtils.gunzip(
           Files.asByteSource(sourceFile),
@@ -192,8 +195,9 @@ public class LocalDataSegmentPuller implements URIDataPuller
           outFile.getAbsolutePath()
       );
       return result;
+    } else {
+      throw new SegmentLoadingException("Do not know how to handle source 
[%s]", sourceFile.getAbsolutePath());
     }
-    throw new SegmentLoadingException("Do not know how to handle source [%s]", 
sourceFile.getAbsolutePath());
   }
 
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
 
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
index 07222ff0b3..a783d5c31b 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.timeline.DataSegment;
@@ -31,6 +32,8 @@ import org.apache.druid.utils.CompressionUtils;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
 import java.util.Map;
 import java.util.UUID;
 
@@ -38,7 +41,8 @@ public class LocalDataSegmentPusher implements 
DataSegmentPusher
 {
   private static final Logger log = new Logger(LocalDataSegmentPusher.class);
 
-  private static final String INDEX_FILENAME = "index.zip";
+  public static final String INDEX_DIR = "index";
+  public static final String INDEX_ZIP_FILENAME = "index.zip";
 
   private final LocalDataSegmentPusherConfig config;
 
@@ -76,7 +80,11 @@ public class LocalDataSegmentPusher implements 
DataSegmentPusher
 
     log.debug("Copying segment[%s] to local filesystem at location[%s]", 
segment.getId(), outDir.toString());
 
+    // Add binary version to the DataSegment object.
+    segment = 
segment.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
+
     if (dataSegmentFile.equals(outDir)) {
+      // Input and output directories are the same. Compute size, build a 
loadSpec, and return.
       long size = 0;
       for (File file : dataSegmentFile.listFiles()) {
         size += file.length();
@@ -85,19 +93,35 @@ public class LocalDataSegmentPusher implements 
DataSegmentPusher
       return segment.withLoadSpec(makeLoadSpec(outDir.toURI()))
                     .withSize(size)
                     
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
+    } else if (config.isZip()) {
+      return pushZip(dataSegmentFile, outDir, segment);
+    } else {
+      return pushNoZip(dataSegmentFile, outDir, segment);
     }
+  }
 
-    final File tmpOutDir = new File(config.getStorageDirectory(), 
makeIntermediateDir());
-    log.debug("Creating intermediate directory[%s] for segment[%s].", 
tmpOutDir.toString(), segment.getId());
-    FileUtils.mkdirp(tmpOutDir);
+  @Override
+  public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
+  {
+    return ImmutableMap.of("type", "local", "path", 
finalIndexZipFilePath.getPath());
+  }
 
-    try {
-      final File tmpIndexFile = new File(tmpOutDir, INDEX_FILENAME);
-      final long size = compressSegment(dataSegmentFile, tmpIndexFile);
+  private String makeIntermediateDir()
+  {
+    return "intermediate_pushes/" + UUID.randomUUID();
+  }
 
-      final DataSegment dataSegment = segment.withLoadSpec(makeLoadSpec(new 
File(outDir, INDEX_FILENAME).toURI()))
-                                             .withSize(size)
-                                             
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
+  private DataSegment pushZip(final File inDir, final File outDir, final 
DataSegment baseSegment) throws IOException
+  {
+    final File tmpSegmentDir = new File(config.getStorageDirectory(), 
makeIntermediateDir());
+    final File tmpIndexFile = new File(tmpSegmentDir, INDEX_ZIP_FILENAME);
+
+    log.debug("Creating intermediate directory[%s] for segment[%s].", 
tmpSegmentDir.toString(), baseSegment.getId());
+    FileUtils.mkdirp(tmpSegmentDir);
+
+    try {
+      log.debug("Compressing files from[%s] to [%s]", inDir, tmpIndexFile);
+      final long size = CompressionUtils.zip(inDir, tmpIndexFile, true);
 
       FileUtils.mkdirp(outDir);
       final File indexFileTarget = new File(outDir, tmpIndexFile.getName());
@@ -106,27 +130,61 @@ public class LocalDataSegmentPusher implements 
DataSegmentPusher
         throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, 
indexFileTarget);
       }
 
-      return dataSegment;
+      return baseSegment.withLoadSpec(makeLoadSpec(new File(outDir, 
INDEX_ZIP_FILENAME).toURI()))
+                        .withSize(size);
     }
     finally {
-      FileUtils.deleteDirectory(tmpOutDir);
+      FileUtils.deleteDirectory(tmpSegmentDir);
     }
   }
 
-  @Override
-  public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
+  private DataSegment pushNoZip(final File inDir, final File outDir, final 
DataSegment baseSegment) throws IOException
   {
-    return ImmutableMap.of("type", "local", "path", 
finalIndexZipFilePath.getPath());
-  }
+    final File tmpSegmentDir = new File(config.getStorageDirectory(), 
makeIntermediateDir());
+    FileUtils.mkdirp(tmpSegmentDir);
 
-  private String makeIntermediateDir()
-  {
-    return "intermediate_pushes/" + UUID.randomUUID();
-  }
+    try {
+      final File[] files = inDir.listFiles();
+      if (files == null) {
+        throw new IOE("Cannot list directory [%s]", inDir);
+      }
 
-  private long compressSegment(File dataSegmentFile, File dest) throws 
IOException
-  {
-    log.debug("Compressing files from[%s] to [%s]", dataSegmentFile, dest);
-    return CompressionUtils.zip(dataSegmentFile, dest, true);
+      long size = 0;
+      for (final File file : files) {
+        if (file.isFile()) {
+          size += file.length();
+          FileUtils.linkOrCopy(file, new File(tmpSegmentDir, file.getName()));
+        } else {
+          // Segment directories are expected to be flat.
+          throw new IOE("Unexpected subdirectory [%s]", file.getName());
+        }
+      }
+
+      final File segmentDir = new File(outDir, INDEX_DIR);
+      FileUtils.mkdirp(outDir);
+
+      try {
+        Files.move(tmpSegmentDir.toPath(), segmentDir.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
+      }
+      catch (IOException e) {
+        if (segmentDir.exists()) {
+          // Move old directory out of the way, then try again. This makes the 
latest push win when we push to the
+          // same directory twice, so behavior is compatible with the zip 
style of pushing.
+          Files.move(
+              segmentDir.toPath(),
+              new File(outDir, StringUtils.format("%s_old_%s", INDEX_DIR, 
UUID.randomUUID())).toPath(),
+              StandardCopyOption.ATOMIC_MOVE
+          );
+
+          Files.move(tmpSegmentDir.toPath(), segmentDir.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
+        }
+      }
+
+      return baseSegment.withLoadSpec(makeLoadSpec(new File(outDir, 
INDEX_DIR).toURI()))
+                        .withSize(size);
+    }
+    finally {
+      FileUtils.deleteDirectory(tmpSegmentDir);
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java
 
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java
index 15868da4de..e539ad20af 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java
@@ -30,8 +30,16 @@ public class LocalDataSegmentPusherConfig
   @JsonProperty
   public File storageDirectory = new File("/tmp/druid/localStorage");
 
+  @JsonProperty
+  public boolean zip = false;
+
   public File getStorageDirectory()
   {
     return storageDirectory;
   }
+
+  public boolean isZip()
+  {
+    return zip;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java
index bc17677f36..2dd6667165 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java
@@ -29,13 +29,30 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
 
+@RunWith(Parameterized.class)
 public class LocalDataSegmentKillerTest
 {
+  private static final String DATASOURCE_NAME = "ds";
+
+  private final boolean zip;
+
+  public LocalDataSegmentKillerTest(boolean zip)
+  {
+    this.zip = zip;
+  }
+
+  @Parameterized.Parameters(name = "zip = {0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(new Object[]{false}, new Object[]{true});
+  }
 
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -46,12 +63,12 @@ public class LocalDataSegmentKillerTest
     LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new 
LocalDataSegmentPusherConfig());
 
     // Create following segments and then delete them in this order and assert 
directory deletions
-    // /tmp/dataSource/interval1/v1/0/index.zip
-    // /tmp/dataSource/interval1/v1/1/index.zip
-    // /tmp/dataSource/interval1/v2/0/index.zip
-    // /tmp/dataSource/interval2/v1/0/index.zip
+    // /tmp/dataSource/interval1/v1/0/
+    // /tmp/dataSource/interval1/v1/1/
+    // /tmp/dataSource/interval1/v2/0/
+    // /tmp/dataSource/interval2/v1/0/
 
-    final File dataSourceDir = temporaryFolder.newFolder();
+    final File dataSourceDir = temporaryFolder.newFolder(DATASOURCE_NAME);
 
     File interval1Dir = new File(dataSourceDir, "interval1");
     File version11Dir = new File(interval1Dir, "v1");
@@ -72,27 +89,28 @@ public class LocalDataSegmentKillerTest
 
     makePartitionDirWithIndex(partition012Dir);
 
-    killer.kill(getSegmentWithPath(new File(partition011Dir, 
"index.zip").toString()));
+    killer.kill(getSegmentWithPath(partition011Dir));
 
     Assert.assertFalse(partition011Dir.exists());
     Assert.assertTrue(partition111Dir.exists());
     Assert.assertTrue(partition021Dir.exists());
     Assert.assertTrue(partition012Dir.exists());
 
-    killer.kill(getSegmentWithPath(new File(partition111Dir, 
"index.zip").toString()));
+    killer.kill(getSegmentWithPath(partition111Dir));
 
     Assert.assertFalse(version11Dir.exists());
     Assert.assertTrue(partition021Dir.exists());
     Assert.assertTrue(partition012Dir.exists());
 
-    killer.kill(getSegmentWithPath(new File(partition021Dir, 
"index.zip").toString()));
+    killer.kill(getSegmentWithPath(partition021Dir));
 
     Assert.assertFalse(interval1Dir.exists());
     Assert.assertTrue(partition012Dir.exists());
 
-    killer.kill(getSegmentWithPath(new File(partition012Dir, 
"index.zip").toString()));
+    killer.kill(getSegmentWithPath(partition012Dir));
 
     Assert.assertFalse(dataSourceDir.exists());
+    Assert.assertTrue(dataSourceDir.getParentFile().exists());
   }
 
   @Test
@@ -100,7 +118,8 @@ public class LocalDataSegmentKillerTest
   {
     final LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new 
LocalDataSegmentPusherConfig());
     final String uuid = UUID.randomUUID().toString().substring(0, 5);
-    final File dataSourceDir = temporaryFolder.newFolder("dataSource");
+    final File emptyParentDir = temporaryFolder.newFolder();
+    final File dataSourceDir = new File(emptyParentDir, DATASOURCE_NAME);
     final File intervalDir = new File(dataSourceDir, "interval");
     final File versionDir = new File(intervalDir, "1");
     final File partitionDir = new File(versionDir, "0");
@@ -108,30 +127,69 @@ public class LocalDataSegmentKillerTest
 
     makePartitionDirWithIndex(uuidDir);
 
-    killer.kill(getSegmentWithPath(new File(uuidDir, "index.zip").toString()));
+    killer.kill(getSegmentWithPath(uuidDir));
 
     Assert.assertFalse(uuidDir.exists());
     Assert.assertFalse(partitionDir.exists());
     Assert.assertFalse(versionDir.exists());
     Assert.assertFalse(intervalDir.exists());
     Assert.assertFalse(dataSourceDir.exists());
+
+    // Verify that we stop after the datasource dir, even though the parent is 
empty.
+    Assert.assertTrue(emptyParentDir.exists());
+    Assert.assertEquals(0, emptyParentDir.listFiles().length);
+  }
+
+  @Test
+  public void testKillUniquePathWrongDataSourceNameInDirectory() throws 
Exception
+  {
+    // Verify that
+    final LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new 
LocalDataSegmentPusherConfig());
+    final String uuid = UUID.randomUUID().toString().substring(0, 5);
+    final File emptyParentDir = temporaryFolder.newFolder();
+    final File dataSourceDir = new File(emptyParentDir, DATASOURCE_NAME + 
"_wrong");
+    final File intervalDir = new File(dataSourceDir, "interval");
+    final File versionDir = new File(intervalDir, "1");
+    final File partitionDir = new File(versionDir, "0");
+    final File uuidDir = new File(partitionDir, uuid);
+
+    makePartitionDirWithIndex(uuidDir);
+
+    killer.kill(getSegmentWithPath(uuidDir));
+
+    Assert.assertFalse(uuidDir.exists());
+    Assert.assertFalse(partitionDir.exists());
+    Assert.assertFalse(versionDir.exists());
+    Assert.assertFalse(intervalDir.exists());
+    Assert.assertFalse(dataSourceDir.exists());
+
+    // Verify that we stop at 4 pruned paths, even if we don't encounter the 
datasource-named directory.
+    Assert.assertTrue(emptyParentDir.exists());
+    Assert.assertEquals(0, emptyParentDir.listFiles().length);
   }
 
   private void makePartitionDirWithIndex(File path) throws IOException
   {
     FileUtils.mkdirp(path);
-    Assert.assertTrue(new File(path, "index.zip").createNewFile());
+
+    if (zip) {
+      Assert.assertTrue(new File(path, 
LocalDataSegmentPusher.INDEX_ZIP_FILENAME).createNewFile());
+    } else {
+      Assert.assertTrue(new File(path, 
LocalDataSegmentPusher.INDEX_DIR).mkdir());
+    }
   }
 
-  private DataSegment getSegmentWithPath(String path)
+  private DataSegment getSegmentWithPath(File baseDirectory)
   {
+    final String fileName = zip ? LocalDataSegmentPusher.INDEX_ZIP_FILENAME : 
LocalDataSegmentPusher.INDEX_DIR;
+    final File path = new File(baseDirectory, fileName);
     return new DataSegment(
-        "dataSource",
+        DATASOURCE_NAME,
         Intervals.of("2000/3000"),
         "ver",
         ImmutableMap.of(
             "type", "local",
-            "path", path
+            "path", path.toURI().getPath()
         ),
         ImmutableList.of("product"),
         ImmutableList.of("visited_sum", "unique_hosts"),
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
index 739075e5e1..8651ac7a8e 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
@@ -48,7 +48,9 @@ public class LocalDataSegmentPusherTest
   public ExpectedException exception = ExpectedException.none();
 
   LocalDataSegmentPusher localDataSegmentPusher;
+  LocalDataSegmentPusher localDataSegmentPusherZip;
   LocalDataSegmentPusherConfig config;
+  LocalDataSegmentPusherConfig configZip;
   File dataSegmentFiles;
   DataSegment dataSegment = new DataSegment(
       "ds",
@@ -77,14 +79,53 @@ public class LocalDataSegmentPusherTest
   public void setUp() throws IOException
   {
     config = new LocalDataSegmentPusherConfig();
+    config.zip = false;
     config.storageDirectory = temporaryFolder.newFolder();
     localDataSegmentPusher = new LocalDataSegmentPusher(config);
+
+    configZip = new LocalDataSegmentPusherConfig();
+    configZip.zip = true;
+    configZip.storageDirectory = temporaryFolder.newFolder();
+    localDataSegmentPusherZip = new LocalDataSegmentPusher(configZip);
+
     dataSegmentFiles = temporaryFolder.newFolder();
     Files.asByteSink(new File(dataSegmentFiles, 
"version.bin")).write(Ints.toByteArray(0x9));
   }
 
   @Test
-  public void testPush() throws IOException
+  public void testPushZip() throws IOException
+  {
+    /* DataSegment - Used to create LoadSpec and Create outDir (Local Deep 
Storage location in this case)
+       File dataSegmentFile - Used to get location of segment files like 
version.bin, meta.smoosh and xxxxx.smoosh
+      */
+    final DataSegment dataSegment2 = dataSegment.withVersion("v2");
+
+    DataSegment returnSegment1 = 
localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false);
+    DataSegment returnSegment2 = 
localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment2, false);
+
+    Assert.assertNotNull(returnSegment1);
+    Assert.assertEquals(dataSegment, returnSegment1);
+
+    Assert.assertNotNull(returnSegment2);
+    Assert.assertEquals(dataSegment2, returnSegment2);
+
+    Assert.assertNotEquals(
+        localDataSegmentPusherZip.getStorageDir(dataSegment, false),
+        localDataSegmentPusherZip.getStorageDir(dataSegment2, false)
+    );
+
+    for (DataSegment returnSegment : ImmutableList.of(returnSegment1, 
returnSegment2)) {
+      File outDir = new File(
+          configZip.getStorageDirectory(),
+          localDataSegmentPusherZip.getStorageDir(returnSegment, false)
+      );
+      File versionFile = new File(outDir, "index.zip");
+      Assert.assertTrue(versionFile.exists());
+    }
+  }
+
+  @Test
+  public void testPushNoZip() throws IOException
   {
     /* DataSegment - Used to create LoadSpec and Create outDir (Local Deep 
Storage location in this case)
        File dataSegmentFile - Used to get location of segment files like 
version.bin, meta.smoosh and xxxxx.smoosh
@@ -107,19 +148,43 @@ public class LocalDataSegmentPusherTest
 
     for (DataSegment returnSegment : ImmutableList.of(returnSegment1, 
returnSegment2)) {
       File outDir = new File(
-          config.getStorageDirectory(),
-          localDataSegmentPusher.getStorageDir(returnSegment, false)
+          new File(
+              config.getStorageDirectory(),
+              localDataSegmentPusher.getStorageDir(returnSegment, false)
+          ),
+          "index"
       );
-      File versionFile = new File(outDir, "index.zip");
+
+      // Check against loadSpec.
+      Assert.assertEquals(
+          outDir.toURI().getPath(),
+          returnSegment.getLoadSpec().get("path")
+      );
+
+      // Check for version.bin.
+      File versionFile = new File(outDir, "version.bin");
       Assert.assertTrue(versionFile.exists());
     }
   }
 
   @Test
-  public void testPushUseUniquePath() throws IOException
+  public void testPushNoZipUseUniquePath() throws IOException
   {
     DataSegment segment = localDataSegmentPusher.push(dataSegmentFiles, 
dataSegment, true);
 
+    String path = segment.getLoadSpec().get("path").toString();
+    Pattern pattern = Pattern.compile(
+        
".*/ds/1970-01-01T00:00:00\\.000Z_1970-01-01T00:00:00\\.001Z/v1/0/[A-Za-z0-9-]{36}/index/$"
+    );
+    Assert.assertTrue(path, pattern.matcher(path).matches());
+    Assert.assertTrue(new File(path).exists());
+  }
+
+  @Test
+  public void testPushZipUseUniquePath() throws IOException
+  {
+    DataSegment segment = localDataSegmentPusherZip.push(dataSegmentFiles, 
dataSegment, true);
+
     String path = segment.getLoadSpec().get("path").toString();
     Pattern pattern = Pattern.compile(
         
".*/ds/1970-01-01T00:00:00\\.000Z_1970-01-01T00:00:00\\.001Z/v1/0/[A-Za-z0-9-]{36}/index\\.zip"
@@ -129,8 +194,12 @@ public class LocalDataSegmentPusherTest
   }
 
   @Test
-  public void testLastPushWinsForConcurrentPushes() throws IOException
+  public void testLastPushWinsForConcurrentNoZipPushes() throws IOException
   {
+    // Behavioral difference between zip and no-zip pushes when the same 
segment identifier is pushed twice:
+    // Later zip pushes overwrite earlier ones. Later no-zip pushes throw 
errors. In situations where the same
+    // segment may be pushed twice, we expect "useUniquePath" to be set on the 
pusher.
+
     File replicatedDataSegmentFiles = temporaryFolder.newFolder();
     Files.asByteSink(new File(replicatedDataSegmentFiles, 
"version.bin")).write(Ints.toByteArray(0x8));
     DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, 
dataSegment, false);
@@ -139,10 +208,38 @@ public class LocalDataSegmentPusherTest
     Assert.assertEquals(dataSegment.getDimensions(), 
returnSegment1.getDimensions());
     Assert.assertEquals(dataSegment2.getDimensions(), 
returnSegment2.getDimensions());
 
-    File unzipDir = new File(config.storageDirectory, "unzip");
+    final String expectedPath = StringUtils.format(
+        "%s/%s",
+        config.storageDirectory,
+        "ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index/"
+    );
+
+    Assert.assertEquals(expectedPath, 
returnSegment1.getLoadSpec().get("path"));
+    Assert.assertEquals(expectedPath, 
returnSegment2.getLoadSpec().get("path"));
+
+    final File versionFile = new File(expectedPath, "version.bin");
+    Assert.assertEquals(0x8, 
Ints.fromByteArray(Files.toByteArray(versionFile)));
+  }
+
+  @Test
+  public void testLastPushWinsForConcurrentZipPushes() throws IOException
+  {
+    // Behavioral difference between zip and no-zip pushes when the same 
segment identifier is pushed twice:
+    // Later zip pushes overwrite earlier ones. Later no-zip pushes throw 
errors. In situations where the same
+    // segment may be pushed twice, we expect "useUniquePath" to be set on the 
pusher.
+
+    File replicatedDataSegmentFiles = temporaryFolder.newFolder();
+    Files.asByteSink(new File(replicatedDataSegmentFiles, 
"version.bin")).write(Ints.toByteArray(0x8));
+    DataSegment returnSegment1 = 
localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false);
+    DataSegment returnSegment2 = 
localDataSegmentPusherZip.push(replicatedDataSegmentFiles, dataSegment2, false);
+
+    Assert.assertEquals(dataSegment.getDimensions(), 
returnSegment1.getDimensions());
+    Assert.assertEquals(dataSegment2.getDimensions(), 
returnSegment2.getDimensions());
+
+    File unzipDir = new File(configZip.storageDirectory, "unzip");
     FileUtils.mkdirp(unzipDir);
     CompressionUtils.unzip(
-        new File(config.storageDirectory, 
"/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"),
+        new File(configZip.storageDirectory, 
"/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"),
         unzipDir
     );
 
@@ -160,27 +257,38 @@ public class LocalDataSegmentPusherTest
     localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false);
   }
 
+  @Test
+  public void testPushZipCannotCreateDirectory() throws IOException
+  {
+    exception.expect(IOException.class);
+    exception.expectMessage("Cannot create directory");
+    configZip.storageDirectory = new File(configZip.storageDirectory, "xxx");
+    Assert.assertTrue(configZip.storageDirectory.mkdir());
+    configZip.storageDirectory.setWritable(false);
+    localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false);
+  }
+
   @Test
   public void testPathForHadoopAbsolute()
   {
-    config.storageDirectory = new File("/druid");
+    configZip.storageDirectory = new File("/druid");
 
     // If this test fails because the path is returned as "file:/druid/", this 
can happen
     // when a /druid directory exists on the local filesystem.
     Assert.assertEquals(
         "file:/druid",
-        new LocalDataSegmentPusher(config).getPathForHadoop()
+        new LocalDataSegmentPusher(configZip).getPathForHadoop()
     );
   }
 
   @Test
   public void testPathForHadoopRelative()
   {
-    config.storageDirectory = new File("druid");
+    configZip.storageDirectory = new File("druid");
 
     Assert.assertEquals(
         StringUtils.format("file:%s/druid", System.getProperty("user.dir")),
-        new LocalDataSegmentPusher(config).getPathForHadoop()
+        new LocalDataSegmentPusher(configZip).getPathForHadoop()
     );
   }
 }
diff --git a/website/.spelling b/website/.spelling
index e52ecebbc5..c9562c34df 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -739,6 +739,8 @@ appenders
 druid-hdfs-storage
 druid-s3-extensions
 druid.sql.planner.maxNumericInFilters
+Minio
+multi-server
  - ../docs/dependencies/metadata-storage.md
 BasicDataSource
  - ../docs/dependencies/zookeeper.md


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to