This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a16b64a Add EventTime interface to Record (#2446)
a16b64a is described below
commit a16b64a0615b145f695987464cf281f68175985b
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Sat Aug 25 22:33:08 2018 -0700
Add EventTime interface to Record (#2446)
* Saved copy of work
* revert conf changes
* Prepare for the pr
* Addressed reviewer comments
---
.../org/apache/pulsar/functions/api/Record.java | 9 ++++++++
.../apache/pulsar/functions/sink/PulsarSink.java | 7 ++++++
.../pulsar/functions/source/PulsarRecord.java | 9 ++++++++
.../org/apache/pulsar/io/twitter/TweetData.java | 1 +
.../apache/pulsar/io/twitter/TwitterFireHose.java | 27 ++++++++++++++++++++--
.../pulsar/io/twitter/TwitterFireHoseConfig.java | 3 +++
6 files changed, 54 insertions(+), 2 deletions(-)
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 2704909..59cc104 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -47,6 +47,15 @@ public interface Record<T> {
T getValue();
/**
+ * Retrieves the event time of the record from the source.
+ *
+ * @return millis since epoch
+ */
+ default Optional<Long> getEventTime() {
+ return Optional.empty();
+ }
+
+ /**
* Retrieves the partition information if any of the record.
*
* @return The partition id where the
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index c1da686..c3df393 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.Base64;
import java.util.Map;
+import java.util.Optional;
import lombok.AccessLevel;
import lombok.Getter;
@@ -239,6 +240,12 @@ public class PulsarSink<T> implements Sink<T> {
msg.property("__pfn_input_topic__",
pulsarRecord.getTopicName().get())
.property("__pfn_input_msg_id__",
new
String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
+ } else {
+ // It is coming from some source
+ Optional<Long> eventTime =
sinkRecord.getSourceRecord().getEventTime();
+ if (eventTime.isPresent()) {
+ msg.eventTime(eventTime.get());
+ }
}
pulsarSinkProcessor.sendOutputMessage(msg, record);
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 46c9213..359f48e 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -75,6 +75,15 @@ public class PulsarRecord<T> implements
RecordWithEncryptionContext<T> {
}
@Override
+ public Optional<Long> getEventTime() {
+ if (message.getEventTime() != 0) {
+ return Optional.of(message.getEventTime());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
public Optional<EncryptionContext> getEncryptionCtx() {
return message.getEncryptionCtx();
}
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
index 3e5503d..36d4dc8 100644
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
+++
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
@@ -44,6 +44,7 @@ public class TweetData {
private String timestampMs;
private Delete delete;
+
@Data
public static class User {
private Long id;
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 2631db1..fc61945 100644
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -34,9 +34,12 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.Map;
import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
@@ -46,6 +49,7 @@ import org.slf4j.LoggerFactory;
/**
* Simple Push based Twitter FireHose Source
*/
+@Slf4j
public class TwitterFireHose extends PushSource<TweetData> {
private static final Logger LOG =
LoggerFactory.getLogger(TwitterFireHose.class);
@@ -126,7 +130,7 @@ public class TwitterFireHose extends PushSource<TweetData> {
// We don't really care if the record succeeds or
not.
// However might be in the future to count failures
// TODO:- Figure out the metrics story for
connectors
- consume(new TwitterRecord(tweet));
+ consume(new TwitterRecord(tweet,
config.getGuestimateTweetTime()));
} catch (Exception e) {
LOG.error("Exception thrown: {}", e);
}
@@ -166,9 +170,12 @@ public class TwitterFireHose extends PushSource<TweetData>
{
static private class TwitterRecord implements Record<TweetData> {
private final TweetData tweet;
+ private static SimpleDateFormat dateFormat = new SimpleDateFormat("EEE
MMM d HH:mm:ss Z yyyy");
+ private final boolean guestimateTweetTime;
- public TwitterRecord(TweetData tweet) {
+ public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) {
this.tweet = tweet;
+ this.guestimateTweetTime = guestimateTweetTime;
}
@Override
@@ -178,6 +185,22 @@ public class TwitterFireHose extends PushSource<TweetData>
{
}
@Override
+ public Optional<Long> getEventTime() {
+ try {
+ if (tweet.getCreatedAt() != null) {
+ Date d = dateFormat.parse(tweet.getCreatedAt());
+ return Optional.of(d.toInstant().toEpochMilli());
+ } else if (guestimateTweetTime) {
+ return Optional.of(System.currentTimeMillis());
+ } else {
+ return Optional.empty();
+ }
+ } catch (Exception e) {
+ return Optional.empty();
+ }
+ }
+
+ @Override
public TweetData getValue() {
return tweet;
}
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
index 83f1baf..88acb33 100644
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
+++
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
@@ -44,6 +44,9 @@ public class TwitterFireHoseConfig implements Serializable {
private String consumerSecret;
private String token;
private String tokenSecret;
+ // Most firehose events have null createdAt time. If this parameter is set
to true
+ // then we estimate the createdTime of each firehose event to be current
time.
+ private Boolean guestimateTweetTime = false;
// ------ Optional property keys