This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch 0.16.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.16.1-incubating by this push:
     new ea60f9f  implement FiniteFirehoseFactory in InlineFirehose (#8682) 
(#8953)
ea60f9f is described below

commit ea60f9f403fec2fd1b2e2a770bb97b2306b71b30
Author: Jonathan Wei <[email protected]>
AuthorDate: Wed Nov 27 01:30:42 2019 -0800

    implement FiniteFirehoseFactory in InlineFirehose (#8682) (#8953)
    
    * implement FiniteFirehoseFactory in InlineFirehose
    
    * override isSplittable in InlineFirehoseFactory & improve tests
---
 .../realtime/firehose/InlineFirehoseFactory.java   | 30 ++++++++++++++++++++--
 .../firehose/InlineFirehoseFactoryTest.java        | 21 +++++++++++++++
 2 files changed, 49 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java
index d33ba9a..dec2ab0 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java
@@ -22,19 +22,21 @@ package org.apache.druid.segment.realtime.firehose;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.FiniteFirehoseFactory;
 import org.apache.druid.data.input.Firehose;
-import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.impl.StringInputRowParser;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
+import java.util.stream.Stream;
 
 /**
  * Creates firehose that produces data inlined in its own spec
  */
-public class InlineFirehoseFactory implements 
FirehoseFactory<StringInputRowParser>
+public class InlineFirehoseFactory implements 
FiniteFirehoseFactory<StringInputRowParser, String>
 {
   private final String data;
 
@@ -74,4 +76,28 @@ public class InlineFirehoseFactory implements 
FirehoseFactory<StringInputRowPars
   {
     return Objects.hash(data);
   }
+
+  @Override
+  public boolean isSplittable()
+  {
+    return false;
+  }
+
+  @Override
+  public Stream<InputSplit<String>> getSplits()
+  {
+    return Stream.of(new InputSplit<>(data));
+  }
+
+  @Override
+  public int getNumSplits()
+  {
+    return 1;
+  }
+
+  @Override
+  public FiniteFirehoseFactory<StringInputRowParser, String> 
withSplit(InputSplit<String> split)
+  {
+    return new InlineFirehoseFactory(split.get());
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java
index fe9d797..6bc9a0e 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java
@@ -20,8 +20,10 @@
 package org.apache.druid.segment.realtime.firehose;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.FiniteFirehoseFactory;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.impl.CSVParseSpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.StringInputRowParser;
@@ -37,6 +39,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 @SuppressWarnings({"NullableProblems", "ConstantConditions"})
 public class InlineFirehoseFactoryTest
@@ -77,6 +80,14 @@ public class InlineFirehoseFactoryTest
     target = new InlineFirehoseFactory(DATA);
   }
 
+  @Test
+  public void testInterfaceImplementation()
+  {
+    Assert.assertTrue(target instanceof FiniteFirehoseFactory);
+    Assert.assertFalse(target.isSplittable());
+    Assert.assertEquals(1, target.getNumSplits());
+  }
+
   @Test(expected = NullPointerException.class)
   public void testContstructorDataRequired()
   {
@@ -102,6 +113,16 @@ public class InlineFirehoseFactoryTest
   }
 
   @Test
+  public void testForcedSplitAndClone()
+  {
+    Optional<InputSplit<String>> inputSplitOptional = 
target.getSplits().findFirst();
+    Assert.assertTrue(inputSplitOptional.isPresent());
+    FiniteFirehoseFactory<StringInputRowParser, String> cloneWithSplit = 
target.withSplit(inputSplitOptional.get());
+    Assert.assertTrue(cloneWithSplit instanceof InlineFirehoseFactory);
+    Assert.assertEquals(DATA, ((InlineFirehoseFactory) 
cloneWithSplit).getData());
+  }
+
+  @Test
   public void testSerde() throws IOException
   {
     final ObjectMapper objectMapper = new DefaultObjectMapper();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to