This is an automated email from the ASF dual-hosted git repository.
cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new de5a4bafcb Zero-copy local deep storage. (#13394)
de5a4bafcb is described below
commit de5a4bafcbad7c05abd9ca528ab157e6a121dbbf
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Dec 12 17:28:24 2022 -0800
Zero-copy local deep storage. (#13394)
* Zero-copy local deep storage.
This is useful for local deep storage, since it reduces disk usage and
makes Historicals able to load segments instantaneously.
Two changes:
1) Introduce "druid.storage.zip" parameter for local storage, which defaults
to false. This changes default behavior from writing an index.zip to
writing
a regular directory. This is safe to do even during a rolling update,
because
the older code actually already handled unzipped directories being
present
on local deep storage.
2) In LocalDataSegmentPuller and LocalDataSegmentPusher, use hard links
instead of copies when possible. (Generally this is possible when the
source and destination directory are on the same filesystem.)
---
.../apache/druid/java/util/common/FileUtils.java | 26 ++++
.../druid/java/util/common/FileUtilsTest.java | 41 +++++++
docs/dependencies/deep-storage.md | 45 +++++--
.../druid/storage/hdfs/HdfsDataSegmentPusher.java | 2 -
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 1 +
.../indexing/kinesis/KinesisIndexTaskTest.java | 1 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 5 +-
.../segment/loading/LocalDataSegmentKiller.java | 15 ++-
.../segment/loading/LocalDataSegmentPuller.java | 68 ++++++-----
.../segment/loading/LocalDataSegmentPusher.java | 106 +++++++++++++----
.../loading/LocalDataSegmentPusherConfig.java | 8 ++
.../loading/LocalDataSegmentKillerTest.java | 88 +++++++++++---
.../loading/LocalDataSegmentPusherTest.java | 132 +++++++++++++++++++--
website/.spelling | 2 +
14 files changed, 440 insertions(+), 100 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
index 2ca3944d84..d2c7a6d1a9 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
@@ -53,6 +53,12 @@ import java.util.UUID;
public class FileUtils
{
+ public enum LinkOrCopyResult
+ {
+ LINK,
+ COPY
+ }
+
/**
* Useful for retry functionality that doesn't want to stop Throwables, but
does want to retry on Exceptions
*/
@@ -461,6 +467,26 @@ public class FileUtils
org.apache.commons.io.FileUtils.deleteDirectory(directory);
}
+ /**
+ * Hard-link "src" as "dest", if possible. If not possible -- perhaps they
are on separate filesystems -- then
+ * copy "src" to "dest".
+ *
+ * @return whether a link or copy was made. Can be safely ignored if you
don't care.
+ *
+ * @throws IOException if something went wrong
+ */
+ public static LinkOrCopyResult linkOrCopy(final File src, final File dest)
throws IOException
+ {
+ try {
+ Files.createLink(dest.toPath(), src.toPath());
+ return LinkOrCopyResult.LINK;
+ }
+ catch (IOException e) {
+ Files.copy(src.toPath(), dest.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ return LinkOrCopyResult.COPY;
+ }
+ }
+
public interface OutputStreamConsumer<T>
{
T apply(OutputStream outputStream) throws IOException;
diff --git
a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
index f76f2e53d2..86909a2621 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
@@ -244,4 +244,45 @@ public class FileUtilsTest
Assert.assertEquals(data.length(), result);
Assert.assertEquals(data,
StringUtils.fromUtf8(Files.readAllBytes(dstFile.toPath())));
}
+
+ @Test
+ public void testLinkOrCopy1() throws IOException
+ {
+ // Will be a LINK.
+
+ final File fromFile = temporaryFolder.newFile();
+ final File toDir = temporaryFolder.newFolder();
+ final File toFile = new File(toDir, "toFile");
+
+ Files.write(fromFile.toPath(), StringUtils.toUtf8("foo"));
+ final FileUtils.LinkOrCopyResult linkOrCopyResult =
FileUtils.linkOrCopy(fromFile, toFile);
+
+ // Verify the new link.
+ Assert.assertEquals(FileUtils.LinkOrCopyResult.LINK, linkOrCopyResult);
+ Assert.assertEquals("foo",
StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
+
+ // Verify they are actually the same file.
+ Files.write(fromFile.toPath(), StringUtils.toUtf8("bar"));
+ Assert.assertEquals("bar",
StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
+ }
+
+ @Test
+ public void testLinkOrCopy2() throws IOException
+ {
+ // Will be a COPY, because the destination file already exists and
therefore Files.createLink fails.
+
+ final File fromFile = temporaryFolder.newFile();
+ final File toFile = temporaryFolder.newFile();
+
+ Files.write(fromFile.toPath(), StringUtils.toUtf8("foo"));
+ final FileUtils.LinkOrCopyResult linkOrCopyResult =
FileUtils.linkOrCopy(fromFile, toFile);
+
+ // Verify the new link.
+ Assert.assertEquals(FileUtils.LinkOrCopyResult.COPY, linkOrCopyResult);
+ Assert.assertEquals("foo",
StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
+
+ // Verify they are not the same file.
+ Files.write(fromFile.toPath(), StringUtils.toUtf8("bar"));
+ Assert.assertEquals("foo",
StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
+ }
}
diff --git a/docs/dependencies/deep-storage.md
b/docs/dependencies/deep-storage.md
index 77ae9b27da..b63f968bf5 100644
--- a/docs/dependencies/deep-storage.md
+++ b/docs/dependencies/deep-storage.md
@@ -25,29 +25,52 @@ title: "Deep storage"
Deep storage is where segments are stored. It is a storage mechanism that
Apache Druid does not provide. This deep storage infrastructure defines the
level of durability of your data, as long as Druid processes can see this
storage infrastructure and get at the segments stored on it, you will not lose
data no matter how many Druid nodes you lose. If segments disappear from this
storage layer, then you will lose whatever data those segments represented.
-## Local Mount
+## Local
-A local mount can be used for storage of segments as well. This allows you to
use just your local file system or anything else that can be mount locally like
NFS, Ceph, etc. This is the default deep storage implementation.
+Local storage is intended for use in the following situations:
-In order to use a local mount for deep storage, you need to set the following
configuration in your common configs.
+- You have just one server.
+- Or, you have multiple servers, and they all have access to a shared
filesystem (for example: NFS).
+
+In multi-server production clusters, rather than local storage with a shared
filesystem, it is instead recommended to
+use cloud-based deep storage ([Amazon S3](#amazon-s3-or-s3-compatible),
[Google Cloud Storage](#google-cloud-storage),
+or [Azure Blob Storage](#azure-blob-storage)), S3-compatible storage (like
Minio), or [HDFS](#hdfs). These options are
+generally more convenient, more scalable, and more robust than setting up a
shared filesystem.
+
+The following configurations in `common.runtime.properties` apply to local
storage:
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
-|`druid.storage.type`|local||Must be set.|
-|`druid.storage.storageDirectory`||Directory for storing segments.|Must be
set.|
+|`druid.storage.type`|`local`||Must be set.|
+|`druid.storage.storageDirectory`|any local directory|Directory for storing
segments. Must be different from `druid.segmentCache.locations` and
`druid.segmentCache.infoDir`.|`/tmp/druid/localStorage`|
+|`druid.storage.zip`|`true`, `false`|Whether segments in
`druid.storage.storageDirectory` are written as directories (`false`) or zip
files (`true`).|`false`|
+
+For example:
+
+```
+druid.storage.type=local
+druid.storage.storageDirectory=/tmp/druid/localStorage
+```
+
+The `druid.storage.storageDirectory` must be set to a different path than
`druid.segmentCache.locations` or
+`druid.segmentCache.infoDir`.
+
+## Amazon S3 or S3-compatible
+
+See [`druid-s3-extensions`](../development/extensions-core/s3.md).
-Note that you should generally set `druid.storage.storageDirectory` to
something different from `druid.segmentCache.locations` and
`druid.segmentCache.infoDir`.
+## Google Cloud Storage
-If you are using the Hadoop indexer in local mode, then just give it a local
file as your output directory and it will work.
+See [`druid-google-extensions`](../development/extensions-core/google.md).
-## S3-compatible
+## Azure Blob Storage
-See [druid-s3-extensions extension
documentation](../development/extensions-core/s3.md).
+See [`druid-azure-extensions`](../development/extensions-core/azure.md).
## HDFS
See [druid-hdfs-storage extension
documentation](../development/extensions-core/hdfs.md).
-## Additional Deep Stores
+## Additional options
-For additional deep stores, please see our [extensions
list](../development/extensions.md).
+For additional deep storage options, please see our [extensions
list](../development/extensions.md).
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
index e262b40da7..280483c11d 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
@@ -54,7 +54,6 @@ public class HdfsDataSegmentPusher implements
DataSegmentPusher
private static final Logger log = new Logger(HdfsDataSegmentPusher.class);
private final Configuration hadoopConfig;
- private final ObjectMapper jsonMapper;
// We lazily initialize fullQualifiedStorageDirectory to avoid potential
issues with Hadoop namenode HA.
// Please see https://github.com/apache/druid/pull/5684
@@ -68,7 +67,6 @@ public class HdfsDataSegmentPusher implements
DataSegmentPusher
)
{
this.hadoopConfig = hadoopConfig;
- this.jsonMapper = jsonMapper;
Path storageDir = new Path(config.getStorageDirectory());
this.fullyQualifiedStorageDirectory = Suppliers.memoize(
() -> {
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index a83bbaf515..3a9000c3d6 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -3159,6 +3159,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
};
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new
LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
+ dataSegmentPusherConfig.zip = true;
final DataSegmentPusher dataSegmentPusher = new
LocalDataSegmentPusher(dataSegmentPusherConfig);
toolboxFactory = new TaskToolboxFactory(
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 553b601f7f..99f939433f 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -3124,6 +3124,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
};
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new
LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
+ dataSegmentPusherConfig.zip = true;
final DataSegmentPusher dataSegmentPusher = new
LocalDataSegmentPusher(dataSegmentPusherConfig);
toolboxFactory = new TaskToolboxFactory(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 443f2c6d1e..fd2b68ef39 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
@@ -162,6 +163,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -943,7 +945,8 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
List<File> segmentFiles = new ArrayList<>();
for (DataSegment segment :
mdc.retrieveUnusedSegmentsForInterval("test_kill_task",
Intervals.of("2011-04-01/P4D"))) {
File file = new File((String) segment.getLoadSpec().get("path"));
- FileUtils.mkdirp(file);
+ FileUtils.mkdirp(file.getParentFile());
+ Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
segmentFiles.add(file);
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java
index f7156073cf..923467c65b 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java
@@ -29,6 +29,7 @@ import java.io.File;
import java.io.IOException;
/**
+ *
*/
public class LocalDataSegmentKiller implements DataSegmentKiller
{
@@ -51,9 +52,12 @@ public class LocalDataSegmentKiller implements
DataSegmentKiller
log.info("Deleting segment[%s] from directory[%s].", segment.getId(),
path);
try {
- if (path.getName().endsWith(".zip")) {
+ if ((path.getName().endsWith(".zip") && path.isFile()) ||
+ (path.getName().equals(LocalDataSegmentPusher.INDEX_DIR) &&
path.isDirectory())) {
// path format -- >
.../dataSource/interval/version/partitionNum/xxx.zip
// or .../dataSource/interval/version/partitionNum/UUID/xxx.zip
+ // or -- > .../dataSource/interval/version/partitionNum/index/
+ // or .../dataSource/interval/version/partitionNum/UUID/index/
File parentDir = path.getParentFile();
FileUtils.deleteDirectory(parentDir);
@@ -62,13 +66,18 @@ public class LocalDataSegmentKiller implements
DataSegmentKiller
parentDir = parentDir.getParentFile();
int maxDepth = 4; // if for some reason there's no datasSource
directory, stop recursing somewhere reasonable
while (parentDir != null && --maxDepth >= 0) {
- if (!parentDir.delete() ||
segment.getDataSource().equals(parentDir.getName())) {
+ // parentDir.listFiles().length > 0 check not strictly necessary,
because parentDir.delete() fails on
+ // nonempty directories. However, including it here is nice since it
makes our intent very clear (only
+ // remove nonempty directories) and it prevents making delete
syscalls that are doomed to failure.
+ if (parentDir.listFiles().length > 0
+ || !parentDir.delete()
+ || segment.getDataSource().equals(parentDir.getName())) {
break;
}
parentDir = parentDir.getParentFile();
}
- } else {
+ } else if (path.exists()) {
throw new SegmentLoadingException("Unknown file type[%s]", path);
}
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
index 5f63557f98..0f27dac9e1 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
@@ -43,6 +43,7 @@ import java.net.URI;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CancellationException;
@@ -125,40 +126,43 @@ public class LocalDataSegmentPuller implements
URIDataPuller
public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final
File dir) throws SegmentLoadingException
{
if (sourceFile.isDirectory()) {
- if (sourceFile.equals(dir)) {
- log.info("Asked to load [%s] into itself, done!", dir);
- return new FileUtils.FileCopyResult(sourceFile);
- }
+ try {
+ final File[] files = sourceFile.listFiles();
+ if (files == null) {
+ throw new SegmentLoadingException("No files found in [%s]",
sourceFile.getAbsolutePath());
+ }
- final File[] files = sourceFile.listFiles();
- if (files == null) {
- throw new SegmentLoadingException("No files found in [%s]",
sourceFile.getAbsolutePath());
- }
- final FileUtils.FileCopyResult result = new
FileUtils.FileCopyResult(sourceFile);
- for (final File oldFile : files) {
- if (oldFile.isDirectory()) {
- log.info("[%s] is a child directory, skipping",
oldFile.getAbsolutePath());
- continue;
+ if (sourceFile.equals(dir)) {
+ log.info("Asked to load [%s] into itself, done!", dir);
+ return new FileUtils.FileCopyResult(Arrays.asList(files));
}
- result.addFiles(
- FileUtils.retryCopy(
- Files.asByteSource(oldFile),
- new File(dir, oldFile.getName()),
- shouldRetryPredicate(),
- DEFAULT_RETRY_COUNT
- ).getFiles()
+ final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
+ boolean link = true;
+ for (final File oldFile : files) {
+ if (oldFile.isDirectory()) {
+ log.info("[%s] is a child directory, skipping",
oldFile.getAbsolutePath());
+ continue;
+ }
+
+ final File newFile = new File(dir, oldFile.getName());
+ final FileUtils.LinkOrCopyResult linkOrCopyResult =
FileUtils.linkOrCopy(oldFile, newFile);
+ link = link && linkOrCopyResult == FileUtils.LinkOrCopyResult.LINK;
+ result.addFile(newFile);
+ }
+ log.info(
+ "%s %d bytes from [%s] to [%s]",
+ link ? "Linked" : "Copied",
+ result.size(),
+ sourceFile.getAbsolutePath(),
+ dir.getAbsolutePath()
);
+ return result;
}
- log.info(
- "Copied %d bytes from [%s] to [%s]",
- result.size(),
- sourceFile.getAbsolutePath(),
- dir.getAbsolutePath()
- );
- return result;
- }
- if (CompressionUtils.isZip(sourceFile.getName())) {
+ catch (IOException e) {
+ throw new SegmentLoadingException(e, "Unable to load from local
directory [%s]", sourceFile.getAbsolutePath());
+ }
+ } else if (CompressionUtils.isZip(sourceFile.getName())) {
try {
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
Files.asByteSource(sourceFile),
@@ -177,8 +181,7 @@ public class LocalDataSegmentPuller implements URIDataPuller
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to unzip file [%s]",
sourceFile.getAbsolutePath());
}
- }
- if (CompressionUtils.isGz(sourceFile.getName())) {
+ } else if (CompressionUtils.isGz(sourceFile.getName())) {
final File outFile = new File(dir,
CompressionUtils.getGzBaseName(sourceFile.getName()));
final FileUtils.FileCopyResult result = CompressionUtils.gunzip(
Files.asByteSource(sourceFile),
@@ -192,8 +195,9 @@ public class LocalDataSegmentPuller implements URIDataPuller
outFile.getAbsolutePath()
);
return result;
+ } else {
+ throw new SegmentLoadingException("Do not know how to handle source
[%s]", sourceFile.getAbsolutePath());
}
- throw new SegmentLoadingException("Do not know how to handle source [%s]",
sourceFile.getAbsolutePath());
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
index 07222ff0b3..a783d5c31b 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
@@ -31,6 +32,8 @@ import org.apache.druid.utils.CompressionUtils;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
import java.util.Map;
import java.util.UUID;
@@ -38,7 +41,8 @@ public class LocalDataSegmentPusher implements
DataSegmentPusher
{
private static final Logger log = new Logger(LocalDataSegmentPusher.class);
- private static final String INDEX_FILENAME = "index.zip";
+ public static final String INDEX_DIR = "index";
+ public static final String INDEX_ZIP_FILENAME = "index.zip";
private final LocalDataSegmentPusherConfig config;
@@ -76,7 +80,11 @@ public class LocalDataSegmentPusher implements
DataSegmentPusher
log.debug("Copying segment[%s] to local filesystem at location[%s]",
segment.getId(), outDir.toString());
+ // Add binary version to the DataSegment object.
+ segment =
segment.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
+
if (dataSegmentFile.equals(outDir)) {
+ // Input and output directories are the same. Compute size, build a
loadSpec, and return.
long size = 0;
for (File file : dataSegmentFile.listFiles()) {
size += file.length();
@@ -85,19 +93,35 @@ public class LocalDataSegmentPusher implements
DataSegmentPusher
return segment.withLoadSpec(makeLoadSpec(outDir.toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
+ } else if (config.isZip()) {
+ return pushZip(dataSegmentFile, outDir, segment);
+ } else {
+ return pushNoZip(dataSegmentFile, outDir, segment);
}
+ }
- final File tmpOutDir = new File(config.getStorageDirectory(),
makeIntermediateDir());
- log.debug("Creating intermediate directory[%s] for segment[%s].",
tmpOutDir.toString(), segment.getId());
- FileUtils.mkdirp(tmpOutDir);
+ @Override
+ public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
+ {
+ return ImmutableMap.of("type", "local", "path",
finalIndexZipFilePath.getPath());
+ }
- try {
- final File tmpIndexFile = new File(tmpOutDir, INDEX_FILENAME);
- final long size = compressSegment(dataSegmentFile, tmpIndexFile);
+ private String makeIntermediateDir()
+ {
+ return "intermediate_pushes/" + UUID.randomUUID();
+ }
- final DataSegment dataSegment = segment.withLoadSpec(makeLoadSpec(new
File(outDir, INDEX_FILENAME).toURI()))
- .withSize(size)
-
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
+ private DataSegment pushZip(final File inDir, final File outDir, final
DataSegment baseSegment) throws IOException
+ {
+ final File tmpSegmentDir = new File(config.getStorageDirectory(),
makeIntermediateDir());
+ final File tmpIndexFile = new File(tmpSegmentDir, INDEX_ZIP_FILENAME);
+
+ log.debug("Creating intermediate directory[%s] for segment[%s].",
tmpSegmentDir.toString(), baseSegment.getId());
+ FileUtils.mkdirp(tmpSegmentDir);
+
+ try {
+ log.debug("Compressing files from[%s] to [%s]", inDir, tmpIndexFile);
+ final long size = CompressionUtils.zip(inDir, tmpIndexFile, true);
FileUtils.mkdirp(outDir);
final File indexFileTarget = new File(outDir, tmpIndexFile.getName());
@@ -106,27 +130,61 @@ public class LocalDataSegmentPusher implements
DataSegmentPusher
throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile,
indexFileTarget);
}
- return dataSegment;
+ return baseSegment.withLoadSpec(makeLoadSpec(new File(outDir,
INDEX_ZIP_FILENAME).toURI()))
+ .withSize(size);
}
finally {
- FileUtils.deleteDirectory(tmpOutDir);
+ FileUtils.deleteDirectory(tmpSegmentDir);
}
}
- @Override
- public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
+ private DataSegment pushNoZip(final File inDir, final File outDir, final
DataSegment baseSegment) throws IOException
{
- return ImmutableMap.of("type", "local", "path",
finalIndexZipFilePath.getPath());
- }
+ final File tmpSegmentDir = new File(config.getStorageDirectory(),
makeIntermediateDir());
+ FileUtils.mkdirp(tmpSegmentDir);
- private String makeIntermediateDir()
- {
- return "intermediate_pushes/" + UUID.randomUUID();
- }
+ try {
+ final File[] files = inDir.listFiles();
+ if (files == null) {
+ throw new IOE("Cannot list directory [%s]", inDir);
+ }
- private long compressSegment(File dataSegmentFile, File dest) throws
IOException
- {
- log.debug("Compressing files from[%s] to [%s]", dataSegmentFile, dest);
- return CompressionUtils.zip(dataSegmentFile, dest, true);
+ long size = 0;
+ for (final File file : files) {
+ if (file.isFile()) {
+ size += file.length();
+ FileUtils.linkOrCopy(file, new File(tmpSegmentDir, file.getName()));
+ } else {
+ // Segment directories are expected to be flat.
+ throw new IOE("Unexpected subdirectory [%s]", file.getName());
+ }
+ }
+
+ final File segmentDir = new File(outDir, INDEX_DIR);
+ FileUtils.mkdirp(outDir);
+
+ try {
+ Files.move(tmpSegmentDir.toPath(), segmentDir.toPath(),
StandardCopyOption.ATOMIC_MOVE);
+ }
+ catch (IOException e) {
+ if (segmentDir.exists()) {
+ // Move old directory out of the way, then try again. This makes the
latest push win when we push to the
+ // same directory twice, so behavior is compatible with the zip
style of pushing.
+ Files.move(
+ segmentDir.toPath(),
+ new File(outDir, StringUtils.format("%s_old_%s", INDEX_DIR,
UUID.randomUUID())).toPath(),
+ StandardCopyOption.ATOMIC_MOVE
+ );
+
+ Files.move(tmpSegmentDir.toPath(), segmentDir.toPath(),
StandardCopyOption.ATOMIC_MOVE);
+ }
+ }
+
+ return baseSegment.withLoadSpec(makeLoadSpec(new File(outDir,
INDEX_DIR).toURI()))
+ .withSize(size);
+ }
+ finally {
+ FileUtils.deleteDirectory(tmpSegmentDir);
+ }
}
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java
index 15868da4de..e539ad20af 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java
@@ -30,8 +30,16 @@ public class LocalDataSegmentPusherConfig
@JsonProperty
public File storageDirectory = new File("/tmp/druid/localStorage");
+ @JsonProperty
+ public boolean zip = false;
+
public File getStorageDirectory()
{
return storageDirectory;
}
+
+ public boolean isZip()
+ {
+ return zip;
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java
b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java
index bc17677f36..2dd6667165 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java
@@ -29,13 +29,30 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
+@RunWith(Parameterized.class)
public class LocalDataSegmentKillerTest
{
+ private static final String DATASOURCE_NAME = "ds";
+
+ private final boolean zip;
+
+ public LocalDataSegmentKillerTest(boolean zip)
+ {
+ this.zip = zip;
+ }
+
+ @Parameterized.Parameters(name = "zip = {0}")
+ public static Iterable<Object[]> constructorFeeder()
+ {
+ return ImmutableList.of(new Object[]{false}, new Object[]{true});
+ }
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -46,12 +63,12 @@ public class LocalDataSegmentKillerTest
LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new
LocalDataSegmentPusherConfig());
// Create following segments and then delete them in this order and assert
directory deletions
- // /tmp/dataSource/interval1/v1/0/index.zip
- // /tmp/dataSource/interval1/v1/1/index.zip
- // /tmp/dataSource/interval1/v2/0/index.zip
- // /tmp/dataSource/interval2/v1/0/index.zip
+ // /tmp/dataSource/interval1/v1/0/
+ // /tmp/dataSource/interval1/v1/1/
+ // /tmp/dataSource/interval1/v2/0/
+ // /tmp/dataSource/interval2/v1/0/
- final File dataSourceDir = temporaryFolder.newFolder();
+ final File dataSourceDir = temporaryFolder.newFolder(DATASOURCE_NAME);
File interval1Dir = new File(dataSourceDir, "interval1");
File version11Dir = new File(interval1Dir, "v1");
@@ -72,27 +89,28 @@ public class LocalDataSegmentKillerTest
makePartitionDirWithIndex(partition012Dir);
- killer.kill(getSegmentWithPath(new File(partition011Dir,
"index.zip").toString()));
+ killer.kill(getSegmentWithPath(partition011Dir));
Assert.assertFalse(partition011Dir.exists());
Assert.assertTrue(partition111Dir.exists());
Assert.assertTrue(partition021Dir.exists());
Assert.assertTrue(partition012Dir.exists());
- killer.kill(getSegmentWithPath(new File(partition111Dir,
"index.zip").toString()));
+ killer.kill(getSegmentWithPath(partition111Dir));
Assert.assertFalse(version11Dir.exists());
Assert.assertTrue(partition021Dir.exists());
Assert.assertTrue(partition012Dir.exists());
- killer.kill(getSegmentWithPath(new File(partition021Dir,
"index.zip").toString()));
+ killer.kill(getSegmentWithPath(partition021Dir));
Assert.assertFalse(interval1Dir.exists());
Assert.assertTrue(partition012Dir.exists());
- killer.kill(getSegmentWithPath(new File(partition012Dir,
"index.zip").toString()));
+ killer.kill(getSegmentWithPath(partition012Dir));
Assert.assertFalse(dataSourceDir.exists());
+ Assert.assertTrue(dataSourceDir.getParentFile().exists());
}
@Test
@@ -100,7 +118,8 @@ public class LocalDataSegmentKillerTest
{
final LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new
LocalDataSegmentPusherConfig());
final String uuid = UUID.randomUUID().toString().substring(0, 5);
- final File dataSourceDir = temporaryFolder.newFolder("dataSource");
+ final File emptyParentDir = temporaryFolder.newFolder();
+ final File dataSourceDir = new File(emptyParentDir, DATASOURCE_NAME);
final File intervalDir = new File(dataSourceDir, "interval");
final File versionDir = new File(intervalDir, "1");
final File partitionDir = new File(versionDir, "0");
@@ -108,30 +127,69 @@ public class LocalDataSegmentKillerTest
makePartitionDirWithIndex(uuidDir);
- killer.kill(getSegmentWithPath(new File(uuidDir, "index.zip").toString()));
+ killer.kill(getSegmentWithPath(uuidDir));
Assert.assertFalse(uuidDir.exists());
Assert.assertFalse(partitionDir.exists());
Assert.assertFalse(versionDir.exists());
Assert.assertFalse(intervalDir.exists());
Assert.assertFalse(dataSourceDir.exists());
+
+ // Verify that we stop after the datasource dir, even though the parent is
empty.
+ Assert.assertTrue(emptyParentDir.exists());
+ Assert.assertEquals(0, emptyParentDir.listFiles().length);
+ }
+
+ @Test
+ public void testKillUniquePathWrongDataSourceNameInDirectory() throws
Exception
+ {
+ // Verify that
+ final LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new
LocalDataSegmentPusherConfig());
+ final String uuid = UUID.randomUUID().toString().substring(0, 5);
+ final File emptyParentDir = temporaryFolder.newFolder();
+ final File dataSourceDir = new File(emptyParentDir, DATASOURCE_NAME +
"_wrong");
+ final File intervalDir = new File(dataSourceDir, "interval");
+ final File versionDir = new File(intervalDir, "1");
+ final File partitionDir = new File(versionDir, "0");
+ final File uuidDir = new File(partitionDir, uuid);
+
+ makePartitionDirWithIndex(uuidDir);
+
+ killer.kill(getSegmentWithPath(uuidDir));
+
+ Assert.assertFalse(uuidDir.exists());
+ Assert.assertFalse(partitionDir.exists());
+ Assert.assertFalse(versionDir.exists());
+ Assert.assertFalse(intervalDir.exists());
+ Assert.assertFalse(dataSourceDir.exists());
+
+ // Verify that we stop at 4 pruned paths, even if we don't encounter the
datasource-named directory.
+ Assert.assertTrue(emptyParentDir.exists());
+ Assert.assertEquals(0, emptyParentDir.listFiles().length);
}
private void makePartitionDirWithIndex(File path) throws IOException
{
FileUtils.mkdirp(path);
- Assert.assertTrue(new File(path, "index.zip").createNewFile());
+
+ if (zip) {
+ Assert.assertTrue(new File(path,
LocalDataSegmentPusher.INDEX_ZIP_FILENAME).createNewFile());
+ } else {
+ Assert.assertTrue(new File(path,
LocalDataSegmentPusher.INDEX_DIR).mkdir());
+ }
}
- private DataSegment getSegmentWithPath(String path)
+ private DataSegment getSegmentWithPath(File baseDirectory)
{
+ final String fileName = zip ? LocalDataSegmentPusher.INDEX_ZIP_FILENAME :
LocalDataSegmentPusher.INDEX_DIR;
+ final File path = new File(baseDirectory, fileName);
return new DataSegment(
- "dataSource",
+ DATASOURCE_NAME,
Intervals.of("2000/3000"),
"ver",
ImmutableMap.of(
"type", "local",
- "path", path
+ "path", path.toURI().getPath()
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
index 739075e5e1..8651ac7a8e 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
@@ -48,7 +48,9 @@ public class LocalDataSegmentPusherTest
public ExpectedException exception = ExpectedException.none();
LocalDataSegmentPusher localDataSegmentPusher;
+ LocalDataSegmentPusher localDataSegmentPusherZip;
LocalDataSegmentPusherConfig config;
+ LocalDataSegmentPusherConfig configZip;
File dataSegmentFiles;
DataSegment dataSegment = new DataSegment(
"ds",
@@ -77,14 +79,53 @@ public class LocalDataSegmentPusherTest
public void setUp() throws IOException
{
config = new LocalDataSegmentPusherConfig();
+ config.zip = false;
config.storageDirectory = temporaryFolder.newFolder();
localDataSegmentPusher = new LocalDataSegmentPusher(config);
+
+ configZip = new LocalDataSegmentPusherConfig();
+ configZip.zip = true;
+ configZip.storageDirectory = temporaryFolder.newFolder();
+ localDataSegmentPusherZip = new LocalDataSegmentPusher(configZip);
+
dataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(dataSegmentFiles,
"version.bin")).write(Ints.toByteArray(0x9));
}
@Test
- public void testPush() throws IOException
+ public void testPushZip() throws IOException
+ {
+ /* DataSegment - Used to create LoadSpec and Create outDir (Local Deep
Storage location in this case)
+ File dataSegmentFile - Used to get location of segment files like
version.bin, meta.smoosh and xxxxx.smoosh
+ */
+ final DataSegment dataSegment2 = dataSegment.withVersion("v2");
+
+ DataSegment returnSegment1 =
localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false);
+ DataSegment returnSegment2 =
localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment2, false);
+
+ Assert.assertNotNull(returnSegment1);
+ Assert.assertEquals(dataSegment, returnSegment1);
+
+ Assert.assertNotNull(returnSegment2);
+ Assert.assertEquals(dataSegment2, returnSegment2);
+
+ Assert.assertNotEquals(
+ localDataSegmentPusherZip.getStorageDir(dataSegment, false),
+ localDataSegmentPusherZip.getStorageDir(dataSegment2, false)
+ );
+
+ for (DataSegment returnSegment : ImmutableList.of(returnSegment1,
returnSegment2)) {
+ File outDir = new File(
+ configZip.getStorageDirectory(),
+ localDataSegmentPusherZip.getStorageDir(returnSegment, false)
+ );
+ File versionFile = new File(outDir, "index.zip");
+ Assert.assertTrue(versionFile.exists());
+ }
+ }
+
+ @Test
+ public void testPushNoZip() throws IOException
{
/* DataSegment - Used to create LoadSpec and Create outDir (Local Deep
Storage location in this case)
File dataSegmentFile - Used to get location of segment files like
version.bin, meta.smoosh and xxxxx.smoosh
@@ -107,19 +148,43 @@ public class LocalDataSegmentPusherTest
for (DataSegment returnSegment : ImmutableList.of(returnSegment1,
returnSegment2)) {
File outDir = new File(
- config.getStorageDirectory(),
- localDataSegmentPusher.getStorageDir(returnSegment, false)
+ new File(
+ config.getStorageDirectory(),
+ localDataSegmentPusher.getStorageDir(returnSegment, false)
+ ),
+ "index"
);
- File versionFile = new File(outDir, "index.zip");
+
+ // Check against loadSpec.
+ Assert.assertEquals(
+ outDir.toURI().getPath(),
+ returnSegment.getLoadSpec().get("path")
+ );
+
+ // Check for version.bin.
+ File versionFile = new File(outDir, "version.bin");
Assert.assertTrue(versionFile.exists());
}
}
@Test
- public void testPushUseUniquePath() throws IOException
+ public void testPushNoZipUseUniquePath() throws IOException
{
DataSegment segment = localDataSegmentPusher.push(dataSegmentFiles,
dataSegment, true);
+ String path = segment.getLoadSpec().get("path").toString();
+ Pattern pattern = Pattern.compile(
+
".*/ds/1970-01-01T00:00:00\\.000Z_1970-01-01T00:00:00\\.001Z/v1/0/[A-Za-z0-9-]{36}/index/$"
+ );
+ Assert.assertTrue(path, pattern.matcher(path).matches());
+ Assert.assertTrue(new File(path).exists());
+ }
+
+ @Test
+ public void testPushZipUseUniquePath() throws IOException
+ {
+ DataSegment segment = localDataSegmentPusherZip.push(dataSegmentFiles,
dataSegment, true);
+
String path = segment.getLoadSpec().get("path").toString();
Pattern pattern = Pattern.compile(
".*/ds/1970-01-01T00:00:00\\.000Z_1970-01-01T00:00:00\\.001Z/v1/0/[A-Za-z0-9-]{36}/index\\.zip"
@@ -129,8 +194,12 @@ public class LocalDataSegmentPusherTest
}
@Test
- public void testLastPushWinsForConcurrentPushes() throws IOException
+ public void testLastPushWinsForConcurrentNoZipPushes() throws IOException
{
+ // Behavioral difference between zip and no-zip pushes when the same
segment identifier is pushed twice:
+ // Later zip pushes overwrite earlier ones. Later no-zip pushes throw
errors. In situations where the same
+ // segment may be pushed twice, we expect "useUniquePath" to be set on the
pusher.
+
File replicatedDataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(replicatedDataSegmentFiles,
"version.bin")).write(Ints.toByteArray(0x8));
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles,
dataSegment, false);
@@ -139,10 +208,38 @@ public class LocalDataSegmentPusherTest
Assert.assertEquals(dataSegment.getDimensions(),
returnSegment1.getDimensions());
Assert.assertEquals(dataSegment2.getDimensions(),
returnSegment2.getDimensions());
- File unzipDir = new File(config.storageDirectory, "unzip");
+ final String expectedPath = StringUtils.format(
+ "%s/%s",
+ config.storageDirectory,
+ "ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index/"
+ );
+
+ Assert.assertEquals(expectedPath,
returnSegment1.getLoadSpec().get("path"));
+ Assert.assertEquals(expectedPath,
returnSegment2.getLoadSpec().get("path"));
+
+ final File versionFile = new File(expectedPath, "version.bin");
+ Assert.assertEquals(0x8,
Ints.fromByteArray(Files.toByteArray(versionFile)));
+ }
+
+ @Test
+ public void testLastPushWinsForConcurrentZipPushes() throws IOException
+ {
+ // Behavioral difference between zip and no-zip pushes when the same
segment identifier is pushed twice:
+ // Later zip pushes overwrite earlier ones. Later no-zip pushes throw
errors. In situations where the same
+ // segment may be pushed twice, we expect "useUniquePath" to be set on the
pusher.
+
+ File replicatedDataSegmentFiles = temporaryFolder.newFolder();
+ Files.asByteSink(new File(replicatedDataSegmentFiles,
"version.bin")).write(Ints.toByteArray(0x8));
+ DataSegment returnSegment1 =
localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false);
+ DataSegment returnSegment2 =
localDataSegmentPusherZip.push(replicatedDataSegmentFiles, dataSegment2, false);
+
+ Assert.assertEquals(dataSegment.getDimensions(),
returnSegment1.getDimensions());
+ Assert.assertEquals(dataSegment2.getDimensions(),
returnSegment2.getDimensions());
+
+ File unzipDir = new File(configZip.storageDirectory, "unzip");
FileUtils.mkdirp(unzipDir);
CompressionUtils.unzip(
- new File(config.storageDirectory,
"/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"),
+ new File(configZip.storageDirectory,
"/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"),
unzipDir
);
@@ -160,27 +257,38 @@ public class LocalDataSegmentPusherTest
localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false);
}
+ @Test
+ public void testPushZipCannotCreateDirectory() throws IOException
+ {
+ exception.expect(IOException.class);
+ exception.expectMessage("Cannot create directory");
+ configZip.storageDirectory = new File(configZip.storageDirectory, "xxx");
+ Assert.assertTrue(configZip.storageDirectory.mkdir());
+ configZip.storageDirectory.setWritable(false);
+ localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false);
+ }
+
@Test
public void testPathForHadoopAbsolute()
{
- config.storageDirectory = new File("/druid");
+ configZip.storageDirectory = new File("/druid");
// If this test fails because the path is returned as "file:/druid/", this
can happen
// when a /druid directory exists on the local filesystem.
Assert.assertEquals(
"file:/druid",
- new LocalDataSegmentPusher(config).getPathForHadoop()
+ new LocalDataSegmentPusher(configZip).getPathForHadoop()
);
}
@Test
public void testPathForHadoopRelative()
{
- config.storageDirectory = new File("druid");
+ configZip.storageDirectory = new File("druid");
Assert.assertEquals(
StringUtils.format("file:%s/druid", System.getProperty("user.dir")),
- new LocalDataSegmentPusher(config).getPathForHadoop()
+ new LocalDataSegmentPusher(configZip).getPathForHadoop()
);
}
}
diff --git a/website/.spelling b/website/.spelling
index e52ecebbc5..c9562c34df 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -739,6 +739,8 @@ appenders
druid-hdfs-storage
druid-s3-extensions
druid.sql.planner.maxNumericInFilters
+Minio
+multi-server
- ../docs/dependencies/metadata-storage.md
BasicDataSource
- ../docs/dependencies/zookeeper.md
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]