Repository: samza Updated Branches: refs/heads/master b71b253d2 -> 6726e1d10
fix HdfsSystemAdmin when staging directory is empty getSystemStreamMetadata has the potential side effect to persist metadata to a staging directory on hdfs. This could fail if staging directory is empty. This patch addresses the issue with test to cover the scenario. Author: Hai Lu <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #151 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6726e1d1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6726e1d1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6726e1d1 Branch: refs/heads/master Commit: 6726e1d10964603826a23ad88896a27ae35ec150 Parents: b71b253 Author: Hai Lu <[email protected]> Authored: Tue May 2 12:09:55 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Tue May 2 12:09:55 2017 -0700 ---------------------------------------------------------------------- .../samza/system/hdfs/HdfsSystemAdmin.java | 29 +++++++++++++++- .../samza/system/hdfs/HdfsSystemConsumer.java | 7 ++++ .../hdfs/partitioner/DirectoryPartitioner.java | 7 ++-- .../system/hdfs/TestHdfsSystemConsumer.java | 36 +++++++++++++++++++- 4 files changed, 75 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6726e1d1/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java index 8bf31c5..f5b05fb 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java @@ -21,6 +21,7 @@ package org.apache.samza.system.hdfs; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -116,10 +117,17 @@ public class HdfsSystemAdmin implements SystemAdmin { } static Map<Partition, List<String>> obtainPartitionDescriptorMap(String stagingDirectory, String streamName) { + if (StringUtils.isBlank(stagingDirectory)) { + LOG.info("Empty or null staging directory: {}", stagingDirectory); + return Collections.emptyMap(); + } + if (StringUtils.isBlank(streamName)) { + throw new SamzaException(String.format("stream name (%s) is null or empty!", streamName)); + } Path path = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName); try (FileSystem fs = path.getFileSystem(new Configuration())) { if (!fs.exists(path)) { - return null; + return Collections.emptyMap(); } try (FSDataInputStream fis = fs.open(path)) { String json = IOUtils.toString(fis, StandardCharsets.UTF_8); @@ -135,6 +143,10 @@ public class HdfsSystemAdmin implements SystemAdmin { */ private void persistPartitionDescriptor(String streamName, Map<Partition, List<String>> partitionDescriptorMap) { + if (StringUtils.isBlank(stagingDirectory) || StringUtils.isBlank(streamName)) { + LOG.warn("Staging directory ({}) or stream name ({}) is empty", stagingDirectory, streamName); + return; + } Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName); try (FileSystem fs = targetPath.getFileSystem(new Configuration())) { // Partition descriptor is supposed to be immutable. So don't override it if it exists. @@ -153,6 +165,10 @@ public class HdfsSystemAdmin implements SystemAdmin { } private boolean partitionDescriptorExists(String streamName) { + if (StringUtils.isBlank(stagingDirectory) || StringUtils.isBlank(streamName)) { + LOG.warn("Staging directory ({}) or stream name ({}) is empty", stagingDirectory, streamName); + return false; + } Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName); try (FileSystem fs = targetPath.getFileSystem(new Configuration())) { return fs.exists(targetPath); @@ -161,6 +177,17 @@ public class HdfsSystemAdmin implements SystemAdmin { } } + /** + * + * Fetch metadata from hdfs system for a set of streams. This has the potential side effect + * to persist partition description to the staging directory on hdfs if staging directory + * is not empty. See getStagingDirectory on {@link HdfsConfig} + * + * @param streamNames + * The streams to to fetch metadata for. + * @return A map from stream name to SystemStreamMetadata for each stream + * requested in the parameter set. + */ @Override public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) { Map<String, SystemStreamMetadata> systemStreamMetadataMap = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/6726e1d1/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java index 13a7102..fb9bb56 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java @@ -31,6 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.Validate; import org.apache.samza.Partition; import org.apache.samza.SamzaException; @@ -132,6 +133,12 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap { public Map<Partition, List<String>> load(String streamName) throws Exception { Validate.notEmpty(streamName); + if (StringUtils.isBlank(stagingDirectory)) { + throw new SamzaException("Staging directory can't be empty. " + + "Is this not a yarn job (currently hdfs system consumer only works in " + + "the same yarn environment on which hdfs is running)? " + "Is STAGING_DIRECTORY (" + + HdfsConfig.STAGING_DIRECTORY() + ") not set (see HdfsConfig.scala)?"); + } return HdfsSystemAdmin.obtainPartitionDescriptorMap(stagingDirectory, streamName); } }); http://git-wip-us.apache.org/repos/asf/samza/blob/6726e1d1/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java index 5cad1e4..0661139 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java @@ -20,6 +20,7 @@ package org.apache.samza.system.hdfs.partitioner; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -32,6 +33,8 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.samza.Partition; import org.apache.samza.SamzaException; @@ -192,12 +195,12 @@ public class DirectoryPartitioner { public Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadataMap(String streamName, @Nullable Map<Partition, List<String>> existingPartitionDescriptorMap) { LOG.info("Trying to obtain metadata for " + streamName); - LOG.info("Existing partition descriptor: " + (existingPartitionDescriptorMap == null ? "empty" + LOG.info("Existing partition descriptor: " + (MapUtils.isEmpty(existingPartitionDescriptorMap) ? "empty" : existingPartitionDescriptorMap)); Map<Partition, SystemStreamPartitionMetadata> partitionMetadataMap = new HashMap<>(); partitionDescriptorMap.putIfAbsent(streamName, new HashMap<>()); List<FileMetadata> filteredFiles = getFilteredFiles(streamName); - if (existingPartitionDescriptorMap != null) { + if (!MapUtils.isEmpty(existingPartitionDescriptorMap)) { filteredFiles = validateAndGetOriginalFilteredFiles(filteredFiles, existingPartitionDescriptorMap); } List<List<FileMetadata>> groupedPartitions = generatePartitionGroups(filteredFiles); http://git-wip-us.apache.org/repos/asf/samza/blob/6726e1d1/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java index ef5ab00..21afcb9 100644 --- a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java +++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.avro.generic.GenericRecord; import org.apache.samza.Partition; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.system.IncomingMessageEnvelope; @@ -41,7 +42,7 @@ import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.Assert; import org.junit.Test; - +import com.google.common.util.concurrent.UncheckedExecutionException; public class TestHdfsSystemConsumer { @@ -133,4 +134,37 @@ public class TestHdfsSystemConsumer { Assert.assertEquals(messages.get(NUM_EVENTS).getOffset(), IncomingMessageEnvelope.END_OF_STREAM_OFFSET); }); } + + /* + * Ensure that empty staging directory will not break system admin, + * but should fail system consumer + */ + @Test + public void testEmptyStagingDirectory() throws Exception { + Map<String, String> configMap = new HashMap<>(); + configMap.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), SYSTEM_NAME), ".*avro"); + Config config = new MapConfig(configMap); + HdfsSystemFactory systemFactory = new HdfsSystemFactory(); + + // create admin and do partitioning + HdfsSystemAdmin systemAdmin = systemFactory.getAdmin(SYSTEM_NAME, config); + String stream = WORKING_DIRECTORY; + Set<String> streamNames = new HashSet<>(); + streamNames.add(stream); + generateAvroDataFiles(); + Map<String, SystemStreamMetadata> streamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames); + SystemStreamMetadata systemStreamMetadata = streamMetadataMap.get(stream); + Assert.assertEquals(NUM_FILES, systemStreamMetadata.getSystemStreamPartitionMetadata().size()); + + // create consumer and read from files + HdfsSystemConsumer systemConsumer = systemFactory.getConsumer(SYSTEM_NAME, config, new NoOpMetricsRegistry()); + Partition partition = new Partition(0); + SystemStreamPartition ssp = new SystemStreamPartition(SYSTEM_NAME, stream, partition); + try { + systemConsumer.register(ssp, "0"); + Assert.fail("Empty staging directory should fail system consumer"); + } catch (UncheckedExecutionException e) { + Assert.assertTrue(e.getCause() instanceof SamzaException); + } + } }
