srkukarni closed pull request #2414:  allow predicates concerning publish time 
to push down to pulsar
URL: https://github.com/apache/incubator-pulsar/pull/2414
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 584a376fac..a1187b8255 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -395,7 +395,7 @@ void asyncSkipEntries(int numEntriesToSkip, 
IndividualDeletedEntries deletedEntr
             final SkipEntriesCallback callback, Object ctx);
 
     /**
-     * Find the newest entry that matches the given predicate.
+     * Find the newest entry that matches the given predicate.  Will only 
search among active entries
      *
      * @param condition
      *            predicate that reads an entry an applies a condition
@@ -405,9 +405,25 @@ void asyncSkipEntries(int numEntriesToSkip, 
IndividualDeletedEntries deletedEntr
      */
     Position findNewestMatching(Predicate<Entry> condition) throws 
InterruptedException, ManagedLedgerException;
 
+
+    /**
+     * Find the newest entry that matches the given predicate.
+     *
+     * @param constraint
+     *            search only active entries or all entries
+     * @param condition
+     *            predicate that reads an entry an applies a condition
+     * @return Position of the newest entry that matches the given predicate
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    Position findNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException;
+
     /**
      * Find the newest entry that matches the given predicate.
      *
+     * @param constraint
+     *            search only active entries or all entries
      * @param condition
      *            predicate that reads an entry an applies a condition
      * @param callback
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
index 995bdf0c76..10997427cb 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
@@ -20,14 +20,16 @@
 
 import java.util.List;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Range;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 
 public interface ReadOnlyCursor {
     /**
      * Read entries from the ManagedLedger, up to the specified number. The 
returned list can be smaller.
      *
-     * @param numberOfEntriesToRead
-     *            maximum number of entries to return
+     * @param numberOfEntriesToRead maximum number of entries to return
      * @return the list of entries
      * @throws ManagedLedgerException
      */
@@ -36,13 +38,10 @@
     /**
      * Asynchronously read entries from the ManagedLedger.
      *
+     * @param numberOfEntriesToRead maximum number of entries to return
+     * @param callback              callback object
+     * @param ctx                   opaque context
      * @see #readEntries(int)
-     * @param numberOfEntriesToRead
-     *            maximum number of entries to return
-     * @param callback
-     *            callback object
-     * @param ctx
-     *            opaque context
      */
     void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback 
callback, Object ctx);
 
@@ -55,7 +54,7 @@
 
     /**
      * Tells whether this cursor has already consumed all the available 
entries.
-     *
+     * <p>
      * <p/>
      * This method is not blocking.
      *
@@ -65,7 +64,7 @@
 
     /**
      * Return the number of messages that this cursor still has to read.
-     *
+     * <p>
      * <p/>
      * This method has linear time complexity on the number of ledgers 
included in the managed ledger.
      *
@@ -76,11 +75,31 @@
     /**
      * Skip n entries from the read position of this cursor.
      *
-     * @param numEntriesToSkip
-     *            number of entries to skip
+     * @param numEntriesToSkip number of entries to skip
      */
     void skipEntries(int numEntriesToSkip);
 
+    /**
+     * Find the newest entry that matches the given predicate.
+     *
+     * @param constraint search only active entries or all entries
+     * @param condition  predicate that reads an entry an applies a condition
+     * @return Position of the newest entry that matches the given predicate
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    Position findNewestMatching(ManagedCursor.FindPositionConstraint 
constraint, Predicate<Entry> condition) throws InterruptedException, 
ManagedLedgerException;
+
+    /**
+     * Return the number of messages that this cursor still has to read.
+     *
+     * <p/>This method has linear time complexity on the number of ledgers 
included in the managed ledger.
+     *
+     * @param range the range between two positions
+     * @return the number of entries in range
+     */
+    long getNumberOfEntries(Range<PositionImpl> range);
+
     /**
      * Close the cursor and releases the associated resources.
      *
@@ -92,10 +111,8 @@
     /**
      * Close the cursor asynchronously and release the associated resources.
      *
-     * @param callback
-     *            callback object
-     * @param ctx
-     *            opaque context
+     * @param callback callback object
+     * @param ctx      opaque context
      */
     void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx);
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index bab354762a..50b47229d6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -687,6 +687,11 @@ public long getNumberOfEntriesInStorage() {
 
     @Override
     public Position findNewestMatching(Predicate<Entry> condition) throws 
InterruptedException, ManagedLedgerException {
+        return findNewestMatching(FindPositionConstraint.SearchActiveEntries, 
condition);
+    }
+
+    @Override
+    public Position findNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException 
{
         final CountDownLatch counter = new CountDownLatch(1);
         class Result {
             ManagedLedgerException exception = null;
@@ -694,7 +699,7 @@ public Position findNewestMatching(Predicate<Entry> 
condition) throws Interrupte
         }
 
         final Result result = new Result();
-        asyncFindNewestMatching(FindPositionConstraint.SearchActiveEntries, 
condition, new FindEntryCallback() {
+        asyncFindNewestMatching(constraint, condition, new FindEntryCallback() 
{
             @Override
             public void findEntryComplete(Position position, Object ctx) {
                 result.position = position;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index df0c08c707..c1a2216b4b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -32,7 +32,7 @@
 public class ReadOnlyCursorImpl extends ManagedCursorImpl implements 
ReadOnlyCursor {
 
     public ReadOnlyCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig 
config, ManagedLedgerImpl ledger,
-            PositionImpl startPosition, String cursorName) {
+                              PositionImpl startPosition, String cursorName) {
         super(bookkeeper, config, ledger, cursorName);
 
         if (startPosition.equals(PositionImpl.earliest)) {
@@ -47,7 +47,7 @@ public ReadOnlyCursorImpl(BookKeeper bookkeeper, 
ManagedLedgerConfig config, Man
             messagesConsumedCounter = 
-getNumberOfEntries(Range.closed(readPosition, ledger.getLastPosition()));
         }
 
-        state = State.NoLedger;
+        this.state = State.NoLedger;
     }
 
     @Override
@@ -62,4 +62,7 @@ public void asyncClose(final AsyncCallbacks.CloseCallback 
callback, final Object
         callback.closeComplete(ctx);
     }
 
+    public long getNumberOfEntries(Range<PositionImpl> range) {
+        return this.ledger.getNumberOfEntries(range);
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 8e3ea7f119..052aa0e771 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -21,9 +21,12 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 
+import com.google.common.collect.Range;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
@@ -35,6 +38,7 @@
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 
+@Slf4j
 public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
 
     public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper, MetaStore store,
@@ -124,6 +128,22 @@ private ReadOnlyCursor createReadOnlyCursor(PositionImpl 
startPosition) {
         return cursor;
     }
 
+    @Override
+    void asyncReadEntry(PositionImpl position, 
AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
+            this.getLedgerHandle(position.getLedgerId()).thenAccept((ledger) 
-> {
+                this.entryCache.asyncReadEntry(ledger, position, callback, 
ctx);
+            }).exceptionally((ex) -> {
+                log.error("[{}] Error opening ledger for reading at position 
{} - {}", new Object[]{this.name, position, ex.getMessage()});
+                
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
 ctx);
+                return null;
+            });
+    }
+
+    @Override
+    public long getNumberOfEntries() {
+        return getNumberOfEntries(Range.openClosed(PositionImpl.earliest, 
getLastPosition()));
+    }
+
     protected boolean isReadOnly() {
         return true;
     }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index ecf6acfc50..3e404d9be0 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -197,6 +197,11 @@ public Position findNewestMatching(Predicate<Entry> 
condition)
             return null;
         }
 
+        @Override
+        public Position findNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException 
{
+            return null;
+        }
+
         @Override
         public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
                 AsyncCallbacks.FindEntryCallback callback, Object ctx) {
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index 0d391a6a73..8341775971 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -20,6 +20,7 @@
 
 import com.facebook.presto.spi.type.BigintType;
 import com.facebook.presto.spi.type.TimestampType;
+import com.facebook.presto.spi.type.TimestampWithTimeZoneType;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarcharType;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -132,8 +133,9 @@ public Object getData(Message message) {
     public static final PulsarInternalColumn EVENT_TIME = new 
EventTimeColumn("__event_time__", TimestampType
             .TIMESTAMP, "Application defined timestamp in milliseconds of when 
the event occurred");
 
-    public static final PulsarInternalColumn PUBLISH_TIME = new 
PublishTimeColumn("__publish_time__", TimestampType
-            .TIMESTAMP, "The timestamp in milliseconds of when event as 
published");
+    public static final PulsarInternalColumn PUBLISH_TIME = new 
PublishTimeColumn("__publish_time__",
+            TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE,
+            "The timestamp in milliseconds of when event as published");
 
     public static final PulsarInternalColumn MESSAGE_ID = new 
MessageIdColumn("__message_id__", VarcharType.VARCHAR,
             "The message ID of the message used to generate this row");
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index c04156eca7..20a56bb7bd 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -32,11 +32,14 @@
 import com.facebook.presto.spi.SchemaTablePrefix;
 import com.facebook.presto.spi.TableNotFoundException;
 import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
 import com.facebook.presto.spi.type.BigintType;
 import com.facebook.presto.spi.type.BooleanType;
 import com.facebook.presto.spi.type.DoubleType;
 import com.facebook.presto.spi.type.IntegerType;
 import com.facebook.presto.spi.type.RealType;
+import com.facebook.presto.spi.type.SqlTimestampWithTimeZone;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarbinaryType;
 import com.facebook.presto.spi.type.VarcharType;
@@ -62,6 +65,8 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static 
org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
 import static 
org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
@@ -116,8 +121,9 @@ public ConnectorTableHandle getTableHandle(ConnectorSession 
session, SchemaTable
     public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession 
session, ConnectorTableHandle table,
                                                             
Constraint<ColumnHandle> constraint,
                                                             
Optional<Set<ColumnHandle>> desiredColumns) {
+
         PulsarTableHandle handle = convertTableHandle(table);
-        ConnectorTableLayout layout = new ConnectorTableLayout(new 
PulsarTableLayoutHandle(handle));
+        ConnectorTableLayout layout = new ConnectorTableLayout(new 
PulsarTableLayoutHandle(handle, constraint.getSummary()));
         return ImmutableList.of(new ConnectorTableLayoutResult(layout, 
constraint.getSummary()));
     }
 
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 3523228763..8593c55ccf 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -20,6 +20,7 @@
 
 import com.facebook.presto.spi.PrestoException;
 import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.TimestampWithTimeZoneType;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarbinaryType;
 import com.facebook.presto.spi.type.VarcharType;
@@ -55,12 +56,15 @@
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
+import static 
com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
+import static com.facebook.presto.spi.type.DateTimeEncoding.unpackMillisUtc;
 import static com.facebook.presto.spi.type.DateType.DATE;
 import static com.facebook.presto.spi.type.IntegerType.INTEGER;
 import static com.facebook.presto.spi.type.RealType.REAL;
 import static com.facebook.presto.spi.type.SmallintType.SMALLINT;
 import static com.facebook.presto.spi.type.TimeType.TIME;
 import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
+import static 
com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
 import static com.facebook.presto.spi.type.TinyintType.TINYINT;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -264,7 +268,7 @@ public long getLong(int field) {
         if (type.equals(BIGINT)) {
             return ((Number) record).longValue();
         } else if (type.equals(DATE)) {
-            return MILLISECONDS.toDays(new 
Date(TimeUnit.DAYS.toMillis(((Number) record).longValue())).getTime());
+            return ((Number) record).longValue();
         } else if (type.equals(INTEGER)) {
             return (int) record;
         } else if (type.equals(REAL)) {
@@ -272,9 +276,11 @@ public long getLong(int field) {
         } else if (type.equals(SMALLINT)) {
             return ((Number) record).shortValue();
         } else if (type.equals(TIME)) {
-            return new Time(((Number) record).longValue()).getTime();
+            return ((Number) record).longValue();
         } else if (type.equals(TIMESTAMP)) {
-            return new Timestamp(((Number) record).longValue()).getTime();
+            return ((Number) record).longValue();
+        } else if (type.equals(TIMESTAMP_WITH_TIME_ZONE)) {
+            return packDateTimeWithZone(((Number) record).longValue(), 0);
         } else if (type.equals(TINYINT)) {
             return Byte.parseByte(record.toString());
         } else {
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 580eb479c8..4bd1171ac0 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorSplit;
 import com.facebook.presto.spi.HostAddress;
+import com.facebook.presto.spi.predicate.TupleDomain;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
@@ -43,7 +45,7 @@
     private final long endPositionEntryId;
     private final long startPositionLedgerId;
     private final long endPositionLedgerId;
-
+    private final TupleDomain<ColumnHandle> tupleDomain;
 
     @JsonCreator
     public PulsarSplit(
@@ -57,7 +59,8 @@ public PulsarSplit(
             @JsonProperty("startPositionEntryId") long startPositionEntryId,
             @JsonProperty("endPositionEntryId") long endPositionEntryId,
             @JsonProperty("startPositionLedgerId") long startPositionLedgerId,
-            @JsonProperty("endPositionLedgerId") long endPositionLedgerId) {
+            @JsonProperty("endPositionLedgerId") long endPositionLedgerId,
+            @JsonProperty("tupleDomain") TupleDomain<ColumnHandle> 
tupleDomain) {
         this.splitId = splitId;
         this.schemaName = requireNonNull(schemaName, "schema name is null");
         this.connectorId = requireNonNull(connectorId, "connector id is null");
@@ -69,6 +72,7 @@ public PulsarSplit(
         this.endPositionEntryId = endPositionEntryId;
         this.startPositionLedgerId = startPositionLedgerId;
         this.endPositionLedgerId = endPositionLedgerId;
+        this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null");
     }
 
     @JsonProperty
@@ -126,6 +130,11 @@ public long getEndPositionLedgerId() {
         return endPositionLedgerId;
     }
 
+    @JsonProperty
+    public TupleDomain<ColumnHandle> getTupleDomain() {
+        return tupleDomain;
+    }
+
     public PositionImpl getStartPosition() {
         return PositionImpl.get(startPositionLedgerId, startPositionEntryId);
     }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 721b8e6d88..488609d7a3 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -18,14 +18,21 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplitSource;
 import com.facebook.presto.spi.ConnectorTableLayoutHandle;
 import com.facebook.presto.spi.FixedSplitSource;
 import com.facebook.presto.spi.connector.ConnectorSplitManager;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.SqlTimestampWithTimeZone;
 import com.google.common.annotations.VisibleForTesting;
 import io.airlift.log.Logger;
+import lombok.Data;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -35,9 +42,11 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.shade.com.google.common.base.Predicate;
 import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
 
 import javax.inject.Inject;
@@ -46,7 +55,9 @@
 import java.util.LinkedList;
 import java.util.List;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
 
 public class PulsarSplitManager implements ConnectorSplitManager {
 
@@ -79,6 +90,7 @@ public ConnectorSplitSource 
getSplits(ConnectorTransactionHandle transactionHand
 
         PulsarTableLayoutHandle layoutHandle = (PulsarTableLayoutHandle) 
layout;
         PulsarTableHandle tableHandle = layoutHandle.getTable();
+        TupleDomain<ColumnHandle> tupleDomain = layoutHandle.getTupleDomain();
 
         TopicName topicName = TopicName.get("persistent", 
NamespaceName.get(tableHandle.getSchemaName()),
                 tableHandle.getTableName());
@@ -95,10 +107,10 @@ public ConnectorSplitSource 
getSplits(ConnectorTransactionHandle transactionHand
         Collection<PulsarSplit> splits;
         try {
             if (!PulsarConnectorUtils.isPartitionedTopic(topicName, 
this.pulsarAdmin)) {
-                splits = getSplitsNonPartitionedTopic(numSplits, topicName, 
tableHandle, schemaInfo);
+                splits = getSplitsNonPartitionedTopic(numSplits, topicName, 
tableHandle, schemaInfo, tupleDomain);
                 log.debug("Splits for non-partitioned topic %s: %s", 
topicName, splits);
             } else {
-                splits = getSplitsPartitionedTopic(numSplits, topicName, 
tableHandle, schemaInfo);
+                splits = getSplitsPartitionedTopic(numSplits, topicName, 
tableHandle, schemaInfo, tupleDomain);
                 log.debug("Splits for partitioned topic %s: %s", topicName, 
splits);
             }
         } catch (Exception e) {
@@ -119,7 +131,7 @@ ManagedLedgerFactory getManagedLedgerFactory() throws 
Exception {
 
     @VisibleForTesting
     Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName 
topicName, PulsarTableHandle
-            tableHandle, SchemaInfo schemaInfo) throws Exception {
+            tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> 
tupleDomain) throws Exception {
         int numPartitions;
         try {
             numPartitions = 
(this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
@@ -148,7 +160,8 @@ ManagedLedgerFactory getManagedLedgerFactory() throws 
Exception {
                                 splitsForThisPartition,
                                 tableHandle,
                                 schemaInfo,
-                                topicName.getPartition(i).getLocalName())
+                                topicName.getPartition(i).getLocalName(),
+                                tupleDomain)
                 );
             }
             return splits;
@@ -163,12 +176,38 @@ ManagedLedgerFactory getManagedLedgerFactory() throws 
Exception {
         }
     }
 
+    @VisibleForTesting
+    Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, 
TopicName topicName, PulsarTableHandle
+            tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> 
tupleDomain) throws Exception {
+        ManagedLedgerFactory managedLedgerFactory = null;
+        try {
+            managedLedgerFactory = getManagedLedgerFactory();
+
+            return getSplitsForTopic(
+                    topicName.getPersistenceNamingEncoding(),
+                    managedLedgerFactory,
+                    numSplits,
+                    tableHandle,
+                    schemaInfo,
+                    tableHandle.getTableName(), tupleDomain);
+        } finally {
+            if (managedLedgerFactory != null) {
+                try {
+                    managedLedgerFactory.shutdown();
+                } catch (Exception e) {
+                    log.error(e);
+                }
+            }
+        }
+    }
+
     @VisibleForTesting
     Collection<PulsarSplit> getSplitsForTopic(String 
topicNamePersistenceEncoding,
                                               ManagedLedgerFactory 
managedLedgerFactory,
                                               int numSplits,
                                               PulsarTableHandle tableHandle,
-                                              SchemaInfo schemaInfo, String 
tableName)
+                                              SchemaInfo schemaInfo, String 
tableName,
+                                              TupleDomain<ColumnHandle> 
tupleDomain)
             throws ManagedLedgerException, InterruptedException {
 
         ReadOnlyCursor readOnlyCursor = null;
@@ -176,10 +215,33 @@ ManagedLedgerFactory getManagedLedgerFactory() throws 
Exception {
             readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
                     topicNamePersistenceEncoding,
                     PositionImpl.earliest, new ManagedLedgerConfig());
+
             long numEntries = readOnlyCursor.getNumberOfEntries();
             if (numEntries <= 0) {
                 return Collections.EMPTY_LIST;
             }
+
+            PredicatePushdownInfo predicatePushdownInfo = 
PredicatePushdownInfo.getPredicatePushdownInfo(
+                    this.connectorId,
+                    tupleDomain,
+                    managedLedgerFactory,
+                    topicNamePersistenceEncoding,
+                    numEntries);
+
+            PositionImpl initialStartPosition;
+            if (predicatePushdownInfo != null) {
+                numEntries = predicatePushdownInfo.getNumOfEntries();
+                initialStartPosition = 
predicatePushdownInfo.getStartPosition();
+            } else {
+                initialStartPosition = (PositionImpl) 
readOnlyCursor.getReadPosition();
+            }
+
+
+            readOnlyCursor.close();
+            readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
+                    topicNamePersistenceEncoding,
+                    initialStartPosition, new ManagedLedgerConfig());
+
             long remainder = numEntries % numSplits;
 
             long avgEntriesPerSplit = numEntries / numSplits;
@@ -200,7 +262,8 @@ ManagedLedgerFactory getManagedLedgerFactory() throws 
Exception {
                         startPosition.getEntryId(),
                         endPosition.getEntryId(),
                         startPosition.getLedgerId(),
-                        endPosition.getLedgerId()));
+                        endPosition.getLedgerId(),
+                        tupleDomain));
             }
             return splits;
         } finally {
@@ -214,28 +277,119 @@ ManagedLedgerFactory getManagedLedgerFactory() throws 
Exception {
         }
     }
 
-    @VisibleForTesting
-    Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, 
TopicName topicName, PulsarTableHandle
-            tableHandle, SchemaInfo schemaInfo) throws Exception {
-        ManagedLedgerFactory managedLedgerFactory = null;
-        try {
-            managedLedgerFactory = getManagedLedgerFactory();
+    @Data
+    private static class PredicatePushdownInfo {
+        private PositionImpl startPosition;
+        private PositionImpl endPosition;
+        private long numOfEntries;
 
-            return getSplitsForTopic(
-                    topicName.getPersistenceNamingEncoding(),
-                    managedLedgerFactory,
-                    numSplits,
-                    tableHandle,
-                    schemaInfo,
-                    tableHandle.getTableName());
-        } finally {
-            if (managedLedgerFactory != null) {
+        private PredicatePushdownInfo(PositionImpl startPosition, PositionImpl 
endPosition, long numOfEntries) {
+            this.startPosition = startPosition;
+            this.endPosition = endPosition;
+            this.numOfEntries = numOfEntries;
+        }
+
+        public static PredicatePushdownInfo getPredicatePushdownInfo(String 
connectorId,
+                                                                     
TupleDomain<ColumnHandle> tupleDomain,
+                                                                     
ManagedLedgerFactory managedLedgerFactory,
+                                                                     String 
topicNamePersistenceEncoding,
+                                                                     long 
totalNumEntries) throws
+                ManagedLedgerException, InterruptedException {
+
+            ReadOnlyCursor readOnlyCursor = null;
+            try {
+                readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
+                        topicNamePersistenceEncoding,
+                        PositionImpl.earliest, new ManagedLedgerConfig());
+
+                if (tupleDomain.getDomains().isPresent()) {
+                    Domain domain = 
tupleDomain.getDomains().get().get(PulsarInternalColumn.PUBLISH_TIME
+                            .getColumnHandle(connectorId, false));
+                    if (domain != null) {
+                        // TODO support arbitrary number of ranges
+                        // only worry about one range for now
+                        if (domain.getValues().getRanges().getRangeCount() == 
1) {
+
+                            checkArgument(domain.getType().isOrderable(), 
"Domain type must be orderable");
+
+                            Long upperBoundTs = null;
+                            Long lowerBoundTs = null;
+
+                            Range range = 
domain.getValues().getRanges().getOrderedRanges().get(0);
+
+                            if (!range.getHigh().isUpperUnbounded()) {
+                                upperBoundTs = new 
SqlTimestampWithTimeZone(range.getHigh().getValueBlock().get()
+                                        .getLong(0, 0)).getMillisUtc();
+                            }
+
+                            if (!range.getLow().isLowerUnbounded()) {
+                                lowerBoundTs = new 
SqlTimestampWithTimeZone(range.getLow().getValueBlock().get()
+                                        .getLong(0, 0)).getMillisUtc();
+                            }
+
+                            PositionImpl overallStartPos;
+                            if (lowerBoundTs == null) {
+                                overallStartPos = (PositionImpl) 
readOnlyCursor.getReadPosition();
+                            } else {
+                                overallStartPos = findPosition(readOnlyCursor, 
lowerBoundTs);
+                            }
+
+                            PositionImpl overallEndPos;
+                            if (upperBoundTs == null) {
+
+                                
readOnlyCursor.skipEntries(Math.toIntExact(totalNumEntries));
+                                overallEndPos = (PositionImpl) 
readOnlyCursor.getReadPosition();
+                            } else {
+                                overallEndPos = findPosition(readOnlyCursor, 
upperBoundTs);
+                            }
+
+                            // Just use a close bound since presto can always 
filter out the extra entries even if
+                            // the bound
+                            // should be open or a mixture of open and closed
+                            
org.apache.pulsar.shade.com.google.common.collect.Range<PositionImpl> posRange
+                                    = 
org.apache.pulsar.shade.com.google.common.collect.Range.range(overallStartPos,
+                                    
org.apache.pulsar.shade.com.google.common.collect.BoundType.CLOSED,
+                                    overallEndPos, 
org.apache.pulsar.shade.com.google.common.collect.BoundType.CLOSED);
+
+                            long numOfEntries = 
readOnlyCursor.getNumberOfEntries(posRange) - 1;
+
+                            PredicatePushdownInfo predicatePushdownInfo
+                                    = new 
PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
+                            log.debug("Predicate pushdown optimization 
calculated: %s", predicatePushdownInfo);
+                            return predicatePushdownInfo;
+                        }
+                    }
+                }
+            } finally {
+                if (readOnlyCursor != null) {
+                    readOnlyCursor.close();
+                }
+            }
+            return null;
+        }
+    }
+
+    private static PositionImpl findPosition(ReadOnlyCursor readOnlyCursor, 
long timestamp) throws
+            ManagedLedgerException,
+            InterruptedException {
+        return (PositionImpl) 
readOnlyCursor.findNewestMatching(SearchAllAvailableEntries, new 
Predicate<Entry>() {
+            @Override
+            public boolean apply(Entry entry) {
+                MessageImpl msg = null;
                 try {
-                    managedLedgerFactory.shutdown();
+                    msg = MessageImpl.deserialize(entry.getDataBuffer());
+
+                    return msg.getPublishTime() <= timestamp;
                 } catch (Exception e) {
-                    log.error(e);
+                    log.error(e, "Failed To deserialize message when finding 
position with error: %s", e);
+                } finally {
+                    entry.release();
+                    if (msg != null) {
+                        msg.recycle();
+                    }
                 }
+                return false;
             }
-        }
+        });
     }
 }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
index 7bdbddad53..a7c240b940 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
@@ -18,18 +18,26 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.predicate.TupleDomain;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Objects;
+
 import static java.util.Objects.requireNonNull;
 
 public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
     private final PulsarTableHandle table;
+    private final TupleDomain<ColumnHandle> tupleDomain;
 
     @JsonCreator
-    public PulsarTableLayoutHandle(@JsonProperty("table") PulsarTableHandle 
table) {
+    public PulsarTableLayoutHandle(@JsonProperty("table") PulsarTableHandle 
table,
+                                   @JsonProperty("tupleDomain") 
TupleDomain<ColumnHandle> domain) {
+
         this.table = requireNonNull(table, "table is null");
+        this.tupleDomain = requireNonNull(domain, "tupleDomain is null");
     }
 
     @JsonProperty
@@ -37,8 +45,35 @@ public PulsarTableHandle getTable() {
         return table;
     }
 
+    @JsonProperty
+    public TupleDomain<ColumnHandle> getTupleDomain()
+    {
+        return tupleDomain;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PulsarTableLayoutHandle that = (PulsarTableLayoutHandle) o;
+        return Objects.equals(table, that.table) &&
+                Objects.equals(tupleDomain, that.tupleDomain);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(table, tupleDomain);
+    }
+
     @Override
-    public String toString() {
+    public String toString()
+    {
         return table.toString();
     }
 }
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index a3fa12d31b..4da7ea57f5 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import com.facebook.presto.spi.predicate.TupleDomain;
 import com.facebook.presto.spi.type.BigintType;
 import com.facebook.presto.spi.type.BooleanType;
 import com.facebook.presto.spi.type.DoubleType;
@@ -28,6 +29,7 @@
 import io.airlift.log.Logger;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -78,6 +80,7 @@
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -86,6 +89,8 @@
 @Test(singleThreaded = true)
 public abstract class TestPulsarConnector {
 
+    protected static final long currentTimeMs = 1534806330000L;
+
     protected PulsarConnectorConfig pulsarConnectorConfig;
 
     protected PulsarMetadata pulsarMetadata;
@@ -104,7 +109,7 @@
     protected static List<TopicName> partitionedTopicNames;
     protected static Map<String, Integer> partitionedTopicsToPartitions;
     protected static Map<String, SchemaInfo> topicsToSchemas;
-    protected static Map<String, Long> topicsToEntries;
+    protected static Map<String, Long> topicsToNumEntries;
 
     protected static final NamespaceName NAMESPACE_NAME_1 = 
NamespaceName.get("tenant-1", "ns-1");
     protected static final NamespaceName NAMESPACE_NAME_2 = 
NamespaceName.get("tenant-1", "ns-2");
@@ -274,19 +279,19 @@ public void setDate(int date) {
         fooTypes.put("time", TIME);
         fooTypes.put("date", DATE);
 
-        topicsToEntries = new HashMap<>();
-        topicsToEntries.put(TOPIC_1.getSchemaName(), 1233L);
-        topicsToEntries.put(TOPIC_2.getSchemaName(), 0L);
-        topicsToEntries.put(TOPIC_3.getSchemaName(), 100L);
-        topicsToEntries.put(TOPIC_4.getSchemaName(), 12345L);
-        topicsToEntries.put(TOPIC_5.getSchemaName(), 8000L);
-        topicsToEntries.put(TOPIC_6.getSchemaName(), 1L);
-        topicsToEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
-        topicsToEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
-        topicsToEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
-        topicsToEntries.put(PARTITIONED_TOPIC_4.getSchemaName(), 0L);
-        topicsToEntries.put(PARTITIONED_TOPIC_5.getSchemaName(), 800L);
-        topicsToEntries.put(PARTITIONED_TOPIC_6.getSchemaName(), 1L);
+        topicsToNumEntries = new HashMap<>();
+        topicsToNumEntries.put(TOPIC_1.getSchemaName(), 1233L);
+        topicsToNumEntries.put(TOPIC_2.getSchemaName(), 0L);
+        topicsToNumEntries.put(TOPIC_3.getSchemaName(), 100L);
+        topicsToNumEntries.put(TOPIC_4.getSchemaName(), 12345L);
+        topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
+        topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
+        topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
+        topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
+        topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
+        topicsToNumEntries.put(PARTITIONED_TOPIC_4.getSchemaName(), 0L);
+        topicsToNumEntries.put(PARTITIONED_TOPIC_5.getSchemaName(), 800L);
+        topicsToNumEntries.put(PARTITIONED_TOPIC_6.getSchemaName(), 1L);
 
         fooColumnHandles = new LinkedList<>();
         for (int i = 0; i < Foo.class.getDeclaredFields().length; i++) {
@@ -315,14 +320,54 @@ public PulsarColumnHandle apply(PulsarInternalColumn 
pulsarInternalColumn) {
         for (TopicName topicName : allTopics) {
             splits.put(topicName, new PulsarSplit(0, 
pulsarConnectorId.toString(),
                     topicName.getNamespace(), topicName.getLocalName(),
-                    topicsToEntries.get(topicName.getSchemaName()),
+                    topicsToNumEntries.get(topicName.getSchemaName()),
                     new 
String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
                     topicsToSchemas.get(topicName.getSchemaName()).getType(),
-                    0, topicsToEntries.get(topicName.getSchemaName()),
-                    0, 0));
+                    0, topicsToNumEntries.get(topicName.getSchemaName()),
+                    0, 0, TupleDomain.all()));
         }
     }
 
+    private static List<Entry> getTopicEntries(String topicSchemaName) {
+        List<Entry> entries = new LinkedList<>();
+
+        long count = topicsToNumEntries.get(topicSchemaName);
+        for (int i=0 ; i < count; i++) {
+
+            Foo foo = new Foo();
+            foo.field1 = (int) count;
+            foo.field2 = String.valueOf(count);
+            foo.field3 = count;
+            foo.field4 = count;
+            foo.field5 = count % 2 == 0;
+            foo.field6 = count;
+            foo.timestamp = System.currentTimeMillis();
+
+            LocalTime now = LocalTime.now(ZoneId.systemDefault());
+            foo.time = now.toSecondOfDay() * 1000;
+
+            LocalDate localDate = LocalDate.now();
+            LocalDate epoch = LocalDate.ofEpochDay(0);
+            foo.date = Math.toIntExact(ChronoUnit.DAYS.between(epoch, 
localDate));
+
+            PulsarApi.MessageMetadata messageMetadata = 
PulsarApi.MessageMetadata.newBuilder()
+                    .setProducerName("test-producer").setSequenceId(i)
+                    .setPublishTime(currentTimeMs + i).build();
+
+            Schema schema = topicsToSchemas.get(topicSchemaName).getType() == 
SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
+
+            org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload
+                    = 
org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo));
+
+            ByteBuf byteBuf = 
serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, 
payload);
+
+            Entry entry = EntryImpl.create(0, i, byteBuf);
+            log.info("create entry: %s", entry.getEntryId());
+            entries.add(entry);
+        }
+        return entries;
+    }
+
     public long completedBytes = 0L;
 
     private static final Logger log = Logger.get(TestPulsarConnector.class);
@@ -464,12 +509,18 @@ public SchemaInfo answer(InvocationOnMock 
invocationOnMock) throws Throwable {
             public ReadOnlyCursor answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                 Object[] args = invocationOnMock.getArguments();
                 String topic = (String) args[0];
-                positions.put(topic, 0);
+                PositionImpl positionImpl = (PositionImpl) args[1];
+
+                int position = positionImpl.getEntryId() == -1 ? 0 : (int) 
positionImpl.getEntryId();
+
+                positions.put(topic, position);
                 String schemaName = TopicName.get(
                         TopicName.get(
                                 topic.replaceAll("/persistent", ""))
                                 .getPartitionedTopicName()).getSchemaName();
-                long entries = topicsToEntries.get(schemaName);
+                long entries = topicsToNumEntries.get(schemaName);
+
+
                 ReadOnlyCursor readOnlyCursor = mock(ReadOnlyCursor.class);
                 doReturn(entries).when(readOnlyCursor).getNumberOfEntries();
 
@@ -545,6 +596,43 @@ public Boolean answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                     }
                 });
 
+                when(readOnlyCursor.findNewestMatching(any(), any())).then(new 
Answer<Position>() {
+                    @Override
+                    public Position answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                        Object[] args = invocationOnMock.getArguments();
+                        
org.apache.pulsar.shade.com.google.common.base.Predicate<Entry> predicate
+                                = 
(org.apache.pulsar.shade.com.google.common.base.Predicate<Entry>) args[1];
+
+                        String schemaName = TopicName.get(
+                                TopicName.get(
+                                        topic.replaceAll("/persistent", ""))
+                                        
.getPartitionedTopicName()).getSchemaName();
+                        List<Entry> entries = getTopicEntries(schemaName);
+
+                        Integer target = null;
+                        for (int i=entries.size() - 1; i >= 0; i--) {
+                            Entry entry = entries.get(i);
+                            if (predicate.apply(entry)) {
+                                target = i;
+                                break;
+                            }
+                        }
+
+                        return target == null ? null : new PositionImpl(0, 
target);
+                    }
+                });
+
+                when(readOnlyCursor.getNumberOfEntries(any())).then(new 
Answer<Long>() {
+                    @Override
+                    public Long answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                        Object[] args = invocationOnMock.getArguments();
+                        
org.apache.pulsar.shade.com.google.common.collect.Range<PositionImpl>  range
+                                = 
(org.apache.pulsar.shade.com.google.common.collect.Range<PositionImpl> ) 
args[0];
+
+                        return (range.upperEndpoint().getEntryId() + 1) - 
range.lowerEndpoint().getEntryId();
+                    }
+                });
+
                 return readOnlyCursor;
             }
         });
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 8d983ed95a..74126a61a5 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -71,7 +71,7 @@ public void testTopics() throws Exception {
                 }
                 count++;
             }
-            Assert.assertEquals(count, 
topicsToEntries.get(topicName.getSchemaName()).longValue());
+            Assert.assertEquals(count, 
topicsToNumEntries.get(topicName.getSchemaName()).longValue());
             Assert.assertEquals(pulsarRecordCursor.getCompletedBytes(), 
completedBytes);
             cleanup();
         }
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index decdc13181..7c9b4bb1c0 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -18,9 +18,15 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplitSource;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.predicate.ValueSet;
+import com.facebook.presto.spi.type.TimeZoneKey;
 import io.airlift.log.Logger;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -30,10 +36,14 @@
 import org.testng.annotations.Test;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static 
com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
+import static 
com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
@@ -69,10 +79,10 @@ public void testTopic() throws Exception {
                     topicName.getNamespace(),
                     topicName.getLocalName(),
                     topicName.getLocalName());
-            PulsarTableLayoutHandle pulsarTableLayoutHandle = new 
PulsarTableLayoutHandle(pulsarTableHandle);
+            PulsarTableLayoutHandle pulsarTableLayoutHandle = new 
PulsarTableLayoutHandle(pulsarTableHandle, TupleDomain.all());
 
             final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new 
ResultCaptor<>();
-            
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(),
 any(), any(), any());
+            
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(),
 any(), any(), any(), any());
 
 
             ConnectorSplitSource connectorSplitSource = 
this.pulsarSplitManager.getSplits(
@@ -80,7 +90,7 @@ public void testTopic() throws Exception {
                     pulsarTableLayoutHandle, null);
 
             verify(this.pulsarSplitManager, times(1))
-                    .getSplitsNonPartitionedTopic(anyInt(), any(), any(), 
any());
+                    .getSplitsNonPartitionedTopic(anyInt(), any(), any(), 
any(), any());
 
             int totalSize = 0;
             for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
@@ -100,7 +110,7 @@ public void testTopic() throws Exception {
                 totalSize += pulsarSplit.getSplitSize();
             }
 
-            Assert.assertEquals(totalSize, 
topicsToEntries.get(topicName.getSchemaName()).intValue());
+            Assert.assertEquals(totalSize, 
topicsToNumEntries.get(topicName.getSchemaName()).intValue());
             cleanup();
         }
 
@@ -115,16 +125,16 @@ public void testPartitionedTopic() throws Exception {
                     topicName.getNamespace(),
                     topicName.getLocalName(),
                     topicName.getLocalName());
-            PulsarTableLayoutHandle pulsarTableLayoutHandle = new 
PulsarTableLayoutHandle(pulsarTableHandle);
+            PulsarTableLayoutHandle pulsarTableLayoutHandle = new 
PulsarTableLayoutHandle(pulsarTableHandle, TupleDomain.all());
 
             final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new 
ResultCaptor<>();
-            
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsPartitionedTopic(anyInt(),
 any(), any(), any());
+            
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsPartitionedTopic(anyInt(),
 any(), any(), any(), any());
 
             
this.pulsarSplitManager.getSplits(mock(ConnectorTransactionHandle.class), 
mock(ConnectorSession.class),
                     pulsarTableLayoutHandle, null);
 
             verify(this.pulsarSplitManager, times(1))
-                    .getSplitsPartitionedTopic(anyInt(), any(), any(), any());
+                    .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), 
any());
 
             int partitions = 
partitionedTopicsToPartitions.get(topicName.toString());
 
@@ -148,7 +158,7 @@ public void testPartitionedTopic() throws Exception {
                     totalSize += pulsarSplit.getSplitSize();
                 }
 
-                Assert.assertEquals(totalSize, 
topicsToEntries.get(topicName.getSchemaName()).intValue());
+                Assert.assertEquals(totalSize, 
topicsToNumEntries.get(topicName.getSchemaName()).intValue());
             }
 
             cleanup();
@@ -165,5 +175,126 @@ public boolean test(PulsarSplit pulsarSplit) {
             }
         }).collect(Collectors.toList());
     }
+
+    @Test
+    public void testPublishTimePredicatePushdown() throws Exception {
+
+        TopicName topicName = TOPIC_1;
+
+        setup();
+        log.info("!----- topic: %s -----!", topicName);
+        PulsarTableHandle pulsarTableHandle = new 
PulsarTableHandle(pulsarConnectorId.toString(),
+                topicName.getNamespace(),
+                topicName.getLocalName(),
+                topicName.getLocalName());
+
+
+        Map<ColumnHandle, Domain> domainMap = new HashMap<>();
+        Domain domain = 
Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, 
packDateTimeWithZone
+                (currentTimeMs + 1L, TimeZoneKey.UTC_KEY), true, 
packDateTimeWithZone(currentTimeMs + 50L,
+                TimeZoneKey.UTC_KEY), true)), false);
+        
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(),
 false), domain);
+        TupleDomain<ColumnHandle> tupleDomain = 
TupleDomain.withColumnDomains(domainMap);
+
+        PulsarTableLayoutHandle pulsarTableLayoutHandle = new 
PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain);
+
+        final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new 
ResultCaptor<>();
+        
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(),
 any(), any(), any
+                (), any());
+
+        ConnectorSplitSource connectorSplitSource = 
this.pulsarSplitManager.getSplits(
+                mock(ConnectorTransactionHandle.class), 
mock(ConnectorSession.class),
+                pulsarTableLayoutHandle, null);
+
+
+        verify(this.pulsarSplitManager, times(1))
+                .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), 
any());
+
+        int totalSize = 0;
+        int initalStart = 1;
+        for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
+            Assert.assertEquals(pulsarSplit.getConnectorId(), 
pulsarConnectorId.toString());
+            Assert.assertEquals(pulsarSplit.getSchemaName(), 
topicName.getNamespace());
+            Assert.assertEquals(pulsarSplit.getTableName(), 
topicName.getLocalName());
+            Assert.assertEquals(pulsarSplit.getSchema(),
+                    new 
String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
+            Assert.assertEquals(pulsarSplit.getSchemaType(), 
topicsToSchemas.get(topicName.getSchemaName()).getType());
+            Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), 
initalStart);
+            Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+            Assert.assertEquals(pulsarSplit.getStartPosition(), 
PositionImpl.get(0, initalStart));
+            Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+            Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), 
initalStart + pulsarSplit.getSplitSize());
+            Assert.assertEquals(pulsarSplit.getEndPosition(), 
PositionImpl.get(0, initalStart + pulsarSplit
+                    .getSplitSize()));
+
+            initalStart += pulsarSplit.getSplitSize();
+            totalSize += pulsarSplit.getSplitSize();
+        }
+        Assert.assertEquals(totalSize, 49);
+
+    }
+
+    @Test
+    public void testPublishTimePredicatePushdownPartitionedTopic() throws 
Exception {
+
+        TopicName topicName = PARTITIONED_TOPIC_1;
+
+        setup();
+        log.info("!----- topic: %s -----!", topicName);
+        PulsarTableHandle pulsarTableHandle = new 
PulsarTableHandle(pulsarConnectorId.toString(),
+                topicName.getNamespace(),
+                topicName.getLocalName(),
+                topicName.getLocalName());
+
+
+        Map<ColumnHandle, Domain> domainMap = new HashMap<>();
+        Domain domain = 
Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, 
packDateTimeWithZone
+                (currentTimeMs + 1L, TimeZoneKey.UTC_KEY), true, 
packDateTimeWithZone(currentTimeMs + 50L,
+                TimeZoneKey.UTC_KEY), true)), false);
+        
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(),
 false), domain);
+        TupleDomain<ColumnHandle> tupleDomain = 
TupleDomain.withColumnDomains(domainMap);
+
+        PulsarTableLayoutHandle pulsarTableLayoutHandle = new 
PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain);
+
+        final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new 
ResultCaptor<>();
+        doAnswer(resultCaptor).when(this.pulsarSplitManager)
+                .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), 
any());
+
+        ConnectorSplitSource connectorSplitSource = 
this.pulsarSplitManager.getSplits(
+                mock(ConnectorTransactionHandle.class), 
mock(ConnectorSession.class),
+                pulsarTableLayoutHandle, null);
+
+
+        verify(this.pulsarSplitManager, times(1))
+                .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), 
any());
+
+
+        int partitions = 
partitionedTopicsToPartitions.get(topicName.toString());
+        for (int i = 0; i < partitions; i++) {
+            List<PulsarSplit> splits = 
getSplitsForPartition(topicName.getPartition(i), resultCaptor.getResult());
+            int totalSize = 0;
+            int initalStart = 1;
+            for (PulsarSplit pulsarSplit : splits) {
+                Assert.assertEquals(pulsarSplit.getConnectorId(), 
pulsarConnectorId.toString());
+                Assert.assertEquals(pulsarSplit.getSchemaName(), 
topicName.getNamespace());
+                Assert.assertEquals(pulsarSplit.getTableName(), 
topicName.getPartition(i).getLocalName());
+                Assert.assertEquals(pulsarSplit.getSchema(),
+                        new 
String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
+                Assert.assertEquals(pulsarSplit.getSchemaType(), 
topicsToSchemas.get(topicName.getSchemaName()).getType());
+                Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), 
initalStart);
+                Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+                Assert.assertEquals(pulsarSplit.getStartPosition(), 
PositionImpl.get(0, initalStart));
+                Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+                Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), 
initalStart + pulsarSplit.getSplitSize());
+                Assert.assertEquals(pulsarSplit.getEndPosition(), 
PositionImpl.get(0, initalStart + pulsarSplit.getSplitSize()));
+
+                initalStart += pulsarSplit.getSplitSize();
+                totalSize += pulsarSplit.getSplitSize();
+            }
+
+            Assert.assertEquals(totalSize, 49);
+        }
+    }
+
     
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to