srkukarni closed pull request #2414: allow predicates concerning publish time
to push down to pulsar
URL: https://github.com/apache/incubator-pulsar/pull/2414
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 584a376fac..a1187b8255 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -395,7 +395,7 @@ void asyncSkipEntries(int numEntriesToSkip,
IndividualDeletedEntries deletedEntr
final SkipEntriesCallback callback, Object ctx);
/**
- * Find the newest entry that matches the given predicate.
+ * Find the newest entry that matches the given predicate. Will only
search among active entries
*
* @param condition
* predicate that reads an entry an applies a condition
@@ -405,9 +405,25 @@ void asyncSkipEntries(int numEntriesToSkip,
IndividualDeletedEntries deletedEntr
*/
Position findNewestMatching(Predicate<Entry> condition) throws
InterruptedException, ManagedLedgerException;
+
+ /**
+ * Find the newest entry that matches the given predicate.
+ *
+ * @param constraint
+ * search only active entries or all entries
+ * @param condition
+ * predicate that reads an entry an applies a condition
+ * @return Position of the newest entry that matches the given predicate
+ * @throws InterruptedException
+ * @throws ManagedLedgerException
+ */
+ Position findNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException;
+
/**
* Find the newest entry that matches the given predicate.
*
+ * @param constraint
+ * search only active entries or all entries
* @param condition
* predicate that reads an entry an applies a condition
* @param callback
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
index 995bdf0c76..10997427cb 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
@@ -20,14 +20,16 @@
import java.util.List;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Range;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
public interface ReadOnlyCursor {
/**
* Read entries from the ManagedLedger, up to the specified number. The
returned list can be smaller.
*
- * @param numberOfEntriesToRead
- * maximum number of entries to return
+ * @param numberOfEntriesToRead maximum number of entries to return
* @return the list of entries
* @throws ManagedLedgerException
*/
@@ -36,13 +38,10 @@
/**
* Asynchronously read entries from the ManagedLedger.
*
+ * @param numberOfEntriesToRead maximum number of entries to return
+ * @param callback callback object
+ * @param ctx opaque context
* @see #readEntries(int)
- * @param numberOfEntriesToRead
- * maximum number of entries to return
- * @param callback
- * callback object
- * @param ctx
- * opaque context
*/
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback
callback, Object ctx);
@@ -55,7 +54,7 @@
/**
* Tells whether this cursor has already consumed all the available
entries.
- *
+ * <p>
* <p/>
* This method is not blocking.
*
@@ -65,7 +64,7 @@
/**
* Return the number of messages that this cursor still has to read.
- *
+ * <p>
* <p/>
* This method has linear time complexity on the number of ledgers
included in the managed ledger.
*
@@ -76,11 +75,31 @@
/**
* Skip n entries from the read position of this cursor.
*
- * @param numEntriesToSkip
- * number of entries to skip
+ * @param numEntriesToSkip number of entries to skip
*/
void skipEntries(int numEntriesToSkip);
+ /**
+ * Find the newest entry that matches the given predicate.
+ *
+ * @param constraint search only active entries or all entries
+ * @param condition predicate that reads an entry an applies a condition
+ * @return Position of the newest entry that matches the given predicate
+ * @throws InterruptedException
+ * @throws ManagedLedgerException
+ */
+ Position findNewestMatching(ManagedCursor.FindPositionConstraint
constraint, Predicate<Entry> condition) throws InterruptedException,
ManagedLedgerException;
+
+ /**
+ * Return the number of messages that this cursor still has to read.
+ *
+ * <p/>This method has linear time complexity on the number of ledgers
included in the managed ledger.
+ *
+ * @param range the range between two positions
+ * @return the number of entries in range
+ */
+ long getNumberOfEntries(Range<PositionImpl> range);
+
/**
* Close the cursor and releases the associated resources.
*
@@ -92,10 +111,8 @@
/**
* Close the cursor asynchronously and release the associated resources.
*
- * @param callback
- * callback object
- * @param ctx
- * opaque context
+ * @param callback callback object
+ * @param ctx opaque context
*/
void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index bab354762a..50b47229d6 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -687,6 +687,11 @@ public long getNumberOfEntriesInStorage() {
@Override
public Position findNewestMatching(Predicate<Entry> condition) throws
InterruptedException, ManagedLedgerException {
+ return findNewestMatching(FindPositionConstraint.SearchActiveEntries,
condition);
+ }
+
+ @Override
+ public Position findNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException
{
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedLedgerException exception = null;
@@ -694,7 +699,7 @@ public Position findNewestMatching(Predicate<Entry>
condition) throws Interrupte
}
final Result result = new Result();
- asyncFindNewestMatching(FindPositionConstraint.SearchActiveEntries,
condition, new FindEntryCallback() {
+ asyncFindNewestMatching(constraint, condition, new FindEntryCallback()
{
@Override
public void findEntryComplete(Position position, Object ctx) {
result.position = position;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index df0c08c707..c1a2216b4b 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -32,7 +32,7 @@
public class ReadOnlyCursorImpl extends ManagedCursorImpl implements
ReadOnlyCursor {
public ReadOnlyCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig
config, ManagedLedgerImpl ledger,
- PositionImpl startPosition, String cursorName) {
+ PositionImpl startPosition, String cursorName) {
super(bookkeeper, config, ledger, cursorName);
if (startPosition.equals(PositionImpl.earliest)) {
@@ -47,7 +47,7 @@ public ReadOnlyCursorImpl(BookKeeper bookkeeper,
ManagedLedgerConfig config, Man
messagesConsumedCounter =
-getNumberOfEntries(Range.closed(readPosition, ledger.getLastPosition()));
}
- state = State.NoLedger;
+ this.state = State.NoLedger;
}
@Override
@@ -62,4 +62,7 @@ public void asyncClose(final AsyncCallbacks.CloseCallback
callback, final Object
callback.closeComplete(ctx);
}
+ public long getNumberOfEntries(Range<PositionImpl> range) {
+ return this.ledger.getNumberOfEntries(range);
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 8e3ea7f119..052aa0e771 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -21,9 +21,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import com.google.common.collect.Range;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
@@ -35,6 +38,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+@Slf4j
public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory,
BookKeeper bookKeeper, MetaStore store,
@@ -124,6 +128,22 @@ private ReadOnlyCursor createReadOnlyCursor(PositionImpl
startPosition) {
return cursor;
}
+ @Override
+ void asyncReadEntry(PositionImpl position,
AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
+ this.getLedgerHandle(position.getLedgerId()).thenAccept((ledger)
-> {
+ this.entryCache.asyncReadEntry(ledger, position, callback,
ctx);
+ }).exceptionally((ex) -> {
+ log.error("[{}] Error opening ledger for reading at position
{} - {}", new Object[]{this.name, position, ex.getMessage()});
+
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
ctx);
+ return null;
+ });
+ }
+
+ @Override
+ public long getNumberOfEntries() {
+ return getNumberOfEntries(Range.openClosed(PositionImpl.earliest,
getLastPosition()));
+ }
+
protected boolean isReadOnly() {
return true;
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index ecf6acfc50..3e404d9be0 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -197,6 +197,11 @@ public Position findNewestMatching(Predicate<Entry>
condition)
return null;
}
+ @Override
+ public Position findNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException
{
+ return null;
+ }
+
@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
AsyncCallbacks.FindEntryCallback callback, Object ctx) {
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index 0d391a6a73..8341775971 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -20,6 +20,7 @@
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.TimestampType;
+import com.facebook.presto.spi.type.TimestampWithTimeZoneType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -132,8 +133,9 @@ public Object getData(Message message) {
public static final PulsarInternalColumn EVENT_TIME = new
EventTimeColumn("__event_time__", TimestampType
.TIMESTAMP, "Application defined timestamp in milliseconds of when
the event occurred");
- public static final PulsarInternalColumn PUBLISH_TIME = new
PublishTimeColumn("__publish_time__", TimestampType
- .TIMESTAMP, "The timestamp in milliseconds of when event as
published");
+ public static final PulsarInternalColumn PUBLISH_TIME = new
PublishTimeColumn("__publish_time__",
+ TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE,
+ "The timestamp in milliseconds of when event as published");
public static final PulsarInternalColumn MESSAGE_ID = new
MessageIdColumn("__message_id__", VarcharType.VARCHAR,
"The message ID of the message used to generate this row");
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index c04156eca7..20a56bb7bd 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -32,11 +32,14 @@
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.BooleanType;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.IntegerType;
import com.facebook.presto.spi.type.RealType;
+import com.facebook.presto.spi.type.SqlTimestampWithTimeZone;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarbinaryType;
import com.facebook.presto.spi.type.VarcharType;
@@ -62,6 +65,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static
org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
import static
org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
@@ -116,8 +121,9 @@ public ConnectorTableHandle getTableHandle(ConnectorSession
session, SchemaTable
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession
session, ConnectorTableHandle table,
Constraint<ColumnHandle> constraint,
Optional<Set<ColumnHandle>> desiredColumns) {
+
PulsarTableHandle handle = convertTableHandle(table);
- ConnectorTableLayout layout = new ConnectorTableLayout(new
PulsarTableLayoutHandle(handle));
+ ConnectorTableLayout layout = new ConnectorTableLayout(new
PulsarTableLayoutHandle(handle, constraint.getSummary()));
return ImmutableList.of(new ConnectorTableLayoutResult(layout,
constraint.getSummary()));
}
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 3523228763..8593c55ccf 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -20,6 +20,7 @@
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.TimestampWithTimeZoneType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarbinaryType;
import com.facebook.presto.spi.type.VarcharType;
@@ -55,12 +56,15 @@
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
+import static
com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
+import static com.facebook.presto.spi.type.DateTimeEncoding.unpackMillisUtc;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static com.facebook.presto.spi.type.RealType.REAL;
import static com.facebook.presto.spi.type.SmallintType.SMALLINT;
import static com.facebook.presto.spi.type.TimeType.TIME;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
+import static
com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.spi.type.TinyintType.TINYINT;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -264,7 +268,7 @@ public long getLong(int field) {
if (type.equals(BIGINT)) {
return ((Number) record).longValue();
} else if (type.equals(DATE)) {
- return MILLISECONDS.toDays(new
Date(TimeUnit.DAYS.toMillis(((Number) record).longValue())).getTime());
+ return ((Number) record).longValue();
} else if (type.equals(INTEGER)) {
return (int) record;
} else if (type.equals(REAL)) {
@@ -272,9 +276,11 @@ public long getLong(int field) {
} else if (type.equals(SMALLINT)) {
return ((Number) record).shortValue();
} else if (type.equals(TIME)) {
- return new Time(((Number) record).longValue()).getTime();
+ return ((Number) record).longValue();
} else if (type.equals(TIMESTAMP)) {
- return new Timestamp(((Number) record).longValue()).getTime();
+ return ((Number) record).longValue();
+ } else if (type.equals(TIMESTAMP_WITH_TIME_ZONE)) {
+ return packDateTimeWithZone(((Number) record).longValue(), 0);
} else if (type.equals(TINYINT)) {
return Byte.parseByte(record.toString());
} else {
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 580eb479c8..4bd1171ac0 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -18,8 +18,10 @@
*/
package org.apache.pulsar.sql.presto;
+import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
+import com.facebook.presto.spi.predicate.TupleDomain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
@@ -43,7 +45,7 @@
private final long endPositionEntryId;
private final long startPositionLedgerId;
private final long endPositionLedgerId;
-
+ private final TupleDomain<ColumnHandle> tupleDomain;
@JsonCreator
public PulsarSplit(
@@ -57,7 +59,8 @@ public PulsarSplit(
@JsonProperty("startPositionEntryId") long startPositionEntryId,
@JsonProperty("endPositionEntryId") long endPositionEntryId,
@JsonProperty("startPositionLedgerId") long startPositionLedgerId,
- @JsonProperty("endPositionLedgerId") long endPositionLedgerId) {
+ @JsonProperty("endPositionLedgerId") long endPositionLedgerId,
+ @JsonProperty("tupleDomain") TupleDomain<ColumnHandle>
tupleDomain) {
this.splitId = splitId;
this.schemaName = requireNonNull(schemaName, "schema name is null");
this.connectorId = requireNonNull(connectorId, "connector id is null");
@@ -69,6 +72,7 @@ public PulsarSplit(
this.endPositionEntryId = endPositionEntryId;
this.startPositionLedgerId = startPositionLedgerId;
this.endPositionLedgerId = endPositionLedgerId;
+ this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null");
}
@JsonProperty
@@ -126,6 +130,11 @@ public long getEndPositionLedgerId() {
return endPositionLedgerId;
}
+ @JsonProperty
+ public TupleDomain<ColumnHandle> getTupleDomain() {
+ return tupleDomain;
+ }
+
public PositionImpl getStartPosition() {
return PositionImpl.get(startPositionLedgerId, startPositionEntryId);
}
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 721b8e6d88..488609d7a3 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -18,14 +18,21 @@
*/
package org.apache.pulsar.sql.presto;
+import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.SqlTimestampWithTimeZone;
import com.google.common.annotations.VisibleForTesting;
import io.airlift.log.Logger;
+import lombok.Data;
+import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -35,9 +42,11 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.shade.com.google.common.base.Predicate;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import javax.inject.Inject;
@@ -46,7 +55,9 @@
import java.util.LinkedList;
import java.util.List;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
+import static
org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
public class PulsarSplitManager implements ConnectorSplitManager {
@@ -79,6 +90,7 @@ public ConnectorSplitSource
getSplits(ConnectorTransactionHandle transactionHand
PulsarTableLayoutHandle layoutHandle = (PulsarTableLayoutHandle)
layout;
PulsarTableHandle tableHandle = layoutHandle.getTable();
+ TupleDomain<ColumnHandle> tupleDomain = layoutHandle.getTupleDomain();
TopicName topicName = TopicName.get("persistent",
NamespaceName.get(tableHandle.getSchemaName()),
tableHandle.getTableName());
@@ -95,10 +107,10 @@ public ConnectorSplitSource
getSplits(ConnectorTransactionHandle transactionHand
Collection<PulsarSplit> splits;
try {
if (!PulsarConnectorUtils.isPartitionedTopic(topicName,
this.pulsarAdmin)) {
- splits = getSplitsNonPartitionedTopic(numSplits, topicName,
tableHandle, schemaInfo);
+ splits = getSplitsNonPartitionedTopic(numSplits, topicName,
tableHandle, schemaInfo, tupleDomain);
log.debug("Splits for non-partitioned topic %s: %s",
topicName, splits);
} else {
- splits = getSplitsPartitionedTopic(numSplits, topicName,
tableHandle, schemaInfo);
+ splits = getSplitsPartitionedTopic(numSplits, topicName,
tableHandle, schemaInfo, tupleDomain);
log.debug("Splits for partitioned topic %s: %s", topicName,
splits);
}
} catch (Exception e) {
@@ -119,7 +131,7 @@ ManagedLedgerFactory getManagedLedgerFactory() throws
Exception {
@VisibleForTesting
Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName
topicName, PulsarTableHandle
- tableHandle, SchemaInfo schemaInfo) throws Exception {
+ tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle>
tupleDomain) throws Exception {
int numPartitions;
try {
numPartitions =
(this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
@@ -148,7 +160,8 @@ ManagedLedgerFactory getManagedLedgerFactory() throws
Exception {
splitsForThisPartition,
tableHandle,
schemaInfo,
- topicName.getPartition(i).getLocalName())
+ topicName.getPartition(i).getLocalName(),
+ tupleDomain)
);
}
return splits;
@@ -163,12 +176,38 @@ ManagedLedgerFactory getManagedLedgerFactory() throws
Exception {
}
}
+ @VisibleForTesting
+ Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits,
TopicName topicName, PulsarTableHandle
+ tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle>
tupleDomain) throws Exception {
+ ManagedLedgerFactory managedLedgerFactory = null;
+ try {
+ managedLedgerFactory = getManagedLedgerFactory();
+
+ return getSplitsForTopic(
+ topicName.getPersistenceNamingEncoding(),
+ managedLedgerFactory,
+ numSplits,
+ tableHandle,
+ schemaInfo,
+ tableHandle.getTableName(), tupleDomain);
+ } finally {
+ if (managedLedgerFactory != null) {
+ try {
+ managedLedgerFactory.shutdown();
+ } catch (Exception e) {
+ log.error(e);
+ }
+ }
+ }
+ }
+
@VisibleForTesting
Collection<PulsarSplit> getSplitsForTopic(String
topicNamePersistenceEncoding,
ManagedLedgerFactory
managedLedgerFactory,
int numSplits,
PulsarTableHandle tableHandle,
- SchemaInfo schemaInfo, String
tableName)
+ SchemaInfo schemaInfo, String
tableName,
+ TupleDomain<ColumnHandle>
tupleDomain)
throws ManagedLedgerException, InterruptedException {
ReadOnlyCursor readOnlyCursor = null;
@@ -176,10 +215,33 @@ ManagedLedgerFactory getManagedLedgerFactory() throws
Exception {
readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
topicNamePersistenceEncoding,
PositionImpl.earliest, new ManagedLedgerConfig());
+
long numEntries = readOnlyCursor.getNumberOfEntries();
if (numEntries <= 0) {
return Collections.EMPTY_LIST;
}
+
+ PredicatePushdownInfo predicatePushdownInfo =
PredicatePushdownInfo.getPredicatePushdownInfo(
+ this.connectorId,
+ tupleDomain,
+ managedLedgerFactory,
+ topicNamePersistenceEncoding,
+ numEntries);
+
+ PositionImpl initialStartPosition;
+ if (predicatePushdownInfo != null) {
+ numEntries = predicatePushdownInfo.getNumOfEntries();
+ initialStartPosition =
predicatePushdownInfo.getStartPosition();
+ } else {
+ initialStartPosition = (PositionImpl)
readOnlyCursor.getReadPosition();
+ }
+
+
+ readOnlyCursor.close();
+ readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
+ topicNamePersistenceEncoding,
+ initialStartPosition, new ManagedLedgerConfig());
+
long remainder = numEntries % numSplits;
long avgEntriesPerSplit = numEntries / numSplits;
@@ -200,7 +262,8 @@ ManagedLedgerFactory getManagedLedgerFactory() throws
Exception {
startPosition.getEntryId(),
endPosition.getEntryId(),
startPosition.getLedgerId(),
- endPosition.getLedgerId()));
+ endPosition.getLedgerId(),
+ tupleDomain));
}
return splits;
} finally {
@@ -214,28 +277,119 @@ ManagedLedgerFactory getManagedLedgerFactory() throws
Exception {
}
}
- @VisibleForTesting
- Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits,
TopicName topicName, PulsarTableHandle
- tableHandle, SchemaInfo schemaInfo) throws Exception {
- ManagedLedgerFactory managedLedgerFactory = null;
- try {
- managedLedgerFactory = getManagedLedgerFactory();
+ @Data
+ private static class PredicatePushdownInfo {
+ private PositionImpl startPosition;
+ private PositionImpl endPosition;
+ private long numOfEntries;
- return getSplitsForTopic(
- topicName.getPersistenceNamingEncoding(),
- managedLedgerFactory,
- numSplits,
- tableHandle,
- schemaInfo,
- tableHandle.getTableName());
- } finally {
- if (managedLedgerFactory != null) {
+ private PredicatePushdownInfo(PositionImpl startPosition, PositionImpl
endPosition, long numOfEntries) {
+ this.startPosition = startPosition;
+ this.endPosition = endPosition;
+ this.numOfEntries = numOfEntries;
+ }
+
+ public static PredicatePushdownInfo getPredicatePushdownInfo(String
connectorId,
+
TupleDomain<ColumnHandle> tupleDomain,
+
ManagedLedgerFactory managedLedgerFactory,
+ String
topicNamePersistenceEncoding,
+ long
totalNumEntries) throws
+ ManagedLedgerException, InterruptedException {
+
+ ReadOnlyCursor readOnlyCursor = null;
+ try {
+ readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
+ topicNamePersistenceEncoding,
+ PositionImpl.earliest, new ManagedLedgerConfig());
+
+ if (tupleDomain.getDomains().isPresent()) {
+ Domain domain =
tupleDomain.getDomains().get().get(PulsarInternalColumn.PUBLISH_TIME
+ .getColumnHandle(connectorId, false));
+ if (domain != null) {
+ // TODO support arbitrary number of ranges
+ // only worry about one range for now
+ if (domain.getValues().getRanges().getRangeCount() ==
1) {
+
+ checkArgument(domain.getType().isOrderable(),
"Domain type must be orderable");
+
+ Long upperBoundTs = null;
+ Long lowerBoundTs = null;
+
+ Range range =
domain.getValues().getRanges().getOrderedRanges().get(0);
+
+ if (!range.getHigh().isUpperUnbounded()) {
+ upperBoundTs = new
SqlTimestampWithTimeZone(range.getHigh().getValueBlock().get()
+ .getLong(0, 0)).getMillisUtc();
+ }
+
+ if (!range.getLow().isLowerUnbounded()) {
+ lowerBoundTs = new
SqlTimestampWithTimeZone(range.getLow().getValueBlock().get()
+ .getLong(0, 0)).getMillisUtc();
+ }
+
+ PositionImpl overallStartPos;
+ if (lowerBoundTs == null) {
+ overallStartPos = (PositionImpl)
readOnlyCursor.getReadPosition();
+ } else {
+ overallStartPos = findPosition(readOnlyCursor,
lowerBoundTs);
+ }
+
+ PositionImpl overallEndPos;
+ if (upperBoundTs == null) {
+
+
readOnlyCursor.skipEntries(Math.toIntExact(totalNumEntries));
+ overallEndPos = (PositionImpl)
readOnlyCursor.getReadPosition();
+ } else {
+ overallEndPos = findPosition(readOnlyCursor,
upperBoundTs);
+ }
+
+ // Just use a close bound since presto can always
filter out the extra entries even if
+ // the bound
+ // should be open or a mixture of open and closed
+
org.apache.pulsar.shade.com.google.common.collect.Range<PositionImpl> posRange
+ =
org.apache.pulsar.shade.com.google.common.collect.Range.range(overallStartPos,
+
org.apache.pulsar.shade.com.google.common.collect.BoundType.CLOSED,
+ overallEndPos,
org.apache.pulsar.shade.com.google.common.collect.BoundType.CLOSED);
+
+ long numOfEntries =
readOnlyCursor.getNumberOfEntries(posRange) - 1;
+
+ PredicatePushdownInfo predicatePushdownInfo
+ = new
PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
+ log.debug("Predicate pushdown optimization
calculated: %s", predicatePushdownInfo);
+ return predicatePushdownInfo;
+ }
+ }
+ }
+ } finally {
+ if (readOnlyCursor != null) {
+ readOnlyCursor.close();
+ }
+ }
+ return null;
+ }
+ }
+
+ private static PositionImpl findPosition(ReadOnlyCursor readOnlyCursor,
long timestamp) throws
+ ManagedLedgerException,
+ InterruptedException {
+ return (PositionImpl)
readOnlyCursor.findNewestMatching(SearchAllAvailableEntries, new
Predicate<Entry>() {
+ @Override
+ public boolean apply(Entry entry) {
+ MessageImpl msg = null;
try {
- managedLedgerFactory.shutdown();
+ msg = MessageImpl.deserialize(entry.getDataBuffer());
+
+ return msg.getPublishTime() <= timestamp;
} catch (Exception e) {
- log.error(e);
+ log.error(e, "Failed To deserialize message when finding
position with error: %s", e);
+ } finally {
+ entry.release();
+ if (msg != null) {
+ msg.recycle();
+ }
}
+ return false;
}
- }
+ });
}
}
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
index 7bdbddad53..a7c240b940 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
@@ -18,18 +18,26 @@
*/
package org.apache.pulsar.sql.presto;
+import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.predicate.TupleDomain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Objects;
+
import static java.util.Objects.requireNonNull;
public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
private final PulsarTableHandle table;
+ private final TupleDomain<ColumnHandle> tupleDomain;
@JsonCreator
- public PulsarTableLayoutHandle(@JsonProperty("table") PulsarTableHandle
table) {
+ public PulsarTableLayoutHandle(@JsonProperty("table") PulsarTableHandle
table,
+ @JsonProperty("tupleDomain")
TupleDomain<ColumnHandle> domain) {
+
this.table = requireNonNull(table, "table is null");
+ this.tupleDomain = requireNonNull(domain, "tupleDomain is null");
}
@JsonProperty
@@ -37,8 +45,35 @@ public PulsarTableHandle getTable() {
return table;
}
+ @JsonProperty
+ public TupleDomain<ColumnHandle> getTupleDomain()
+ {
+ return tupleDomain;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PulsarTableLayoutHandle that = (PulsarTableLayoutHandle) o;
+ return Objects.equals(table, that.table) &&
+ Objects.equals(tupleDomain, that.tupleDomain);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(table, tupleDomain);
+ }
+
@Override
- public String toString() {
+ public String toString()
+ {
return table.toString();
}
}
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index a3fa12d31b..4da7ea57f5 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.sql.presto;
+import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.BooleanType;
import com.facebook.presto.spi.type.DoubleType;
@@ -28,6 +29,7 @@
import io.airlift.log.Logger;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -78,6 +80,7 @@
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -86,6 +89,8 @@
@Test(singleThreaded = true)
public abstract class TestPulsarConnector {
+ protected static final long currentTimeMs = 1534806330000L;
+
protected PulsarConnectorConfig pulsarConnectorConfig;
protected PulsarMetadata pulsarMetadata;
@@ -104,7 +109,7 @@
protected static List<TopicName> partitionedTopicNames;
protected static Map<String, Integer> partitionedTopicsToPartitions;
protected static Map<String, SchemaInfo> topicsToSchemas;
- protected static Map<String, Long> topicsToEntries;
+ protected static Map<String, Long> topicsToNumEntries;
protected static final NamespaceName NAMESPACE_NAME_1 =
NamespaceName.get("tenant-1", "ns-1");
protected static final NamespaceName NAMESPACE_NAME_2 =
NamespaceName.get("tenant-1", "ns-2");
@@ -274,19 +279,19 @@ public void setDate(int date) {
fooTypes.put("time", TIME);
fooTypes.put("date", DATE);
- topicsToEntries = new HashMap<>();
- topicsToEntries.put(TOPIC_1.getSchemaName(), 1233L);
- topicsToEntries.put(TOPIC_2.getSchemaName(), 0L);
- topicsToEntries.put(TOPIC_3.getSchemaName(), 100L);
- topicsToEntries.put(TOPIC_4.getSchemaName(), 12345L);
- topicsToEntries.put(TOPIC_5.getSchemaName(), 8000L);
- topicsToEntries.put(TOPIC_6.getSchemaName(), 1L);
- topicsToEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
- topicsToEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
- topicsToEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
- topicsToEntries.put(PARTITIONED_TOPIC_4.getSchemaName(), 0L);
- topicsToEntries.put(PARTITIONED_TOPIC_5.getSchemaName(), 800L);
- topicsToEntries.put(PARTITIONED_TOPIC_6.getSchemaName(), 1L);
+ topicsToNumEntries = new HashMap<>();
+ topicsToNumEntries.put(TOPIC_1.getSchemaName(), 1233L);
+ topicsToNumEntries.put(TOPIC_2.getSchemaName(), 0L);
+ topicsToNumEntries.put(TOPIC_3.getSchemaName(), 100L);
+ topicsToNumEntries.put(TOPIC_4.getSchemaName(), 12345L);
+ topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
+ topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_4.getSchemaName(), 0L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_5.getSchemaName(), 800L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_6.getSchemaName(), 1L);
fooColumnHandles = new LinkedList<>();
for (int i = 0; i < Foo.class.getDeclaredFields().length; i++) {
@@ -315,14 +320,54 @@ public PulsarColumnHandle apply(PulsarInternalColumn
pulsarInternalColumn) {
for (TopicName topicName : allTopics) {
splits.put(topicName, new PulsarSplit(0,
pulsarConnectorId.toString(),
topicName.getNamespace(), topicName.getLocalName(),
- topicsToEntries.get(topicName.getSchemaName()),
+ topicsToNumEntries.get(topicName.getSchemaName()),
new
String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
topicsToSchemas.get(topicName.getSchemaName()).getType(),
- 0, topicsToEntries.get(topicName.getSchemaName()),
- 0, 0));
+ 0, topicsToNumEntries.get(topicName.getSchemaName()),
+ 0, 0, TupleDomain.all()));
}
}
+ private static List<Entry> getTopicEntries(String topicSchemaName) {
+ List<Entry> entries = new LinkedList<>();
+
+ long count = topicsToNumEntries.get(topicSchemaName);
+ for (int i=0 ; i < count; i++) {
+
+ Foo foo = new Foo();
+ foo.field1 = (int) count;
+ foo.field2 = String.valueOf(count);
+ foo.field3 = count;
+ foo.field4 = count;
+ foo.field5 = count % 2 == 0;
+ foo.field6 = count;
+ foo.timestamp = System.currentTimeMillis();
+
+ LocalTime now = LocalTime.now(ZoneId.systemDefault());
+ foo.time = now.toSecondOfDay() * 1000;
+
+ LocalDate localDate = LocalDate.now();
+ LocalDate epoch = LocalDate.ofEpochDay(0);
+ foo.date = Math.toIntExact(ChronoUnit.DAYS.between(epoch,
localDate));
+
+ PulsarApi.MessageMetadata messageMetadata =
PulsarApi.MessageMetadata.newBuilder()
+ .setProducerName("test-producer").setSequenceId(i)
+ .setPublishTime(currentTimeMs + i).build();
+
+ Schema schema = topicsToSchemas.get(topicSchemaName).getType() ==
SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
+
+ org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload
+ =
org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo));
+
+ ByteBuf byteBuf =
serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
payload);
+
+ Entry entry = EntryImpl.create(0, i, byteBuf);
+ log.info("create entry: %s", entry.getEntryId());
+ entries.add(entry);
+ }
+ return entries;
+ }
+
public long completedBytes = 0L;
private static final Logger log = Logger.get(TestPulsarConnector.class);
@@ -464,12 +509,18 @@ public SchemaInfo answer(InvocationOnMock
invocationOnMock) throws Throwable {
public ReadOnlyCursor answer(InvocationOnMock invocationOnMock)
throws Throwable {
Object[] args = invocationOnMock.getArguments();
String topic = (String) args[0];
- positions.put(topic, 0);
+ PositionImpl positionImpl = (PositionImpl) args[1];
+
+ int position = positionImpl.getEntryId() == -1 ? 0 : (int)
positionImpl.getEntryId();
+
+ positions.put(topic, position);
String schemaName = TopicName.get(
TopicName.get(
topic.replaceAll("/persistent", ""))
.getPartitionedTopicName()).getSchemaName();
- long entries = topicsToEntries.get(schemaName);
+ long entries = topicsToNumEntries.get(schemaName);
+
+
ReadOnlyCursor readOnlyCursor = mock(ReadOnlyCursor.class);
doReturn(entries).when(readOnlyCursor).getNumberOfEntries();
@@ -545,6 +596,43 @@ public Boolean answer(InvocationOnMock invocationOnMock)
throws Throwable {
}
});
+ when(readOnlyCursor.findNewestMatching(any(), any())).then(new
Answer<Position>() {
+ @Override
+ public Position answer(InvocationOnMock invocationOnMock)
throws Throwable {
+ Object[] args = invocationOnMock.getArguments();
+
org.apache.pulsar.shade.com.google.common.base.Predicate<Entry> predicate
+ =
(org.apache.pulsar.shade.com.google.common.base.Predicate<Entry>) args[1];
+
+ String schemaName = TopicName.get(
+ TopicName.get(
+ topic.replaceAll("/persistent", ""))
+
.getPartitionedTopicName()).getSchemaName();
+ List<Entry> entries = getTopicEntries(schemaName);
+
+ Integer target = null;
+ for (int i=entries.size() - 1; i >= 0; i--) {
+ Entry entry = entries.get(i);
+ if (predicate.apply(entry)) {
+ target = i;
+ break;
+ }
+ }
+
+ return target == null ? null : new PositionImpl(0,
target);
+ }
+ });
+
+ when(readOnlyCursor.getNumberOfEntries(any())).then(new
Answer<Long>() {
+ @Override
+ public Long answer(InvocationOnMock invocationOnMock)
throws Throwable {
+ Object[] args = invocationOnMock.getArguments();
+
org.apache.pulsar.shade.com.google.common.collect.Range<PositionImpl> range
+ =
(org.apache.pulsar.shade.com.google.common.collect.Range<PositionImpl> )
args[0];
+
+ return (range.upperEndpoint().getEntryId() + 1) -
range.lowerEndpoint().getEntryId();
+ }
+ });
+
return readOnlyCursor;
}
});
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 8d983ed95a..74126a61a5 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -71,7 +71,7 @@ public void testTopics() throws Exception {
}
count++;
}
- Assert.assertEquals(count,
topicsToEntries.get(topicName.getSchemaName()).longValue());
+ Assert.assertEquals(count,
topicsToNumEntries.get(topicName.getSchemaName()).longValue());
Assert.assertEquals(pulsarRecordCursor.getCompletedBytes(),
completedBytes);
cleanup();
}
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index decdc13181..7c9b4bb1c0 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -18,9 +18,15 @@
*/
package org.apache.pulsar.sql.presto;
+import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.predicate.ValueSet;
+import com.facebook.presto.spi.type.TimeZoneKey;
import io.airlift.log.Logger;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.naming.TopicName;
@@ -30,10 +36,14 @@
import org.testng.annotations.Test;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static
com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
+import static
com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doAnswer;
@@ -69,10 +79,10 @@ public void testTopic() throws Exception {
topicName.getNamespace(),
topicName.getLocalName(),
topicName.getLocalName());
- PulsarTableLayoutHandle pulsarTableLayoutHandle = new
PulsarTableLayoutHandle(pulsarTableHandle);
+ PulsarTableLayoutHandle pulsarTableLayoutHandle = new
PulsarTableLayoutHandle(pulsarTableHandle, TupleDomain.all());
final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new
ResultCaptor<>();
-
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(),
any(), any(), any());
+
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(),
any(), any(), any(), any());
ConnectorSplitSource connectorSplitSource =
this.pulsarSplitManager.getSplits(
@@ -80,7 +90,7 @@ public void testTopic() throws Exception {
pulsarTableLayoutHandle, null);
verify(this.pulsarSplitManager, times(1))
- .getSplitsNonPartitionedTopic(anyInt(), any(), any(),
any());
+ .getSplitsNonPartitionedTopic(anyInt(), any(), any(),
any(), any());
int totalSize = 0;
for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
@@ -100,7 +110,7 @@ public void testTopic() throws Exception {
totalSize += pulsarSplit.getSplitSize();
}
- Assert.assertEquals(totalSize,
topicsToEntries.get(topicName.getSchemaName()).intValue());
+ Assert.assertEquals(totalSize,
topicsToNumEntries.get(topicName.getSchemaName()).intValue());
cleanup();
}
@@ -115,16 +125,16 @@ public void testPartitionedTopic() throws Exception {
topicName.getNamespace(),
topicName.getLocalName(),
topicName.getLocalName());
- PulsarTableLayoutHandle pulsarTableLayoutHandle = new
PulsarTableLayoutHandle(pulsarTableHandle);
+ PulsarTableLayoutHandle pulsarTableLayoutHandle = new
PulsarTableLayoutHandle(pulsarTableHandle, TupleDomain.all());
final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new
ResultCaptor<>();
-
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsPartitionedTopic(anyInt(),
any(), any(), any());
+
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsPartitionedTopic(anyInt(),
any(), any(), any(), any());
this.pulsarSplitManager.getSplits(mock(ConnectorTransactionHandle.class),
mock(ConnectorSession.class),
pulsarTableLayoutHandle, null);
verify(this.pulsarSplitManager, times(1))
- .getSplitsPartitionedTopic(anyInt(), any(), any(), any());
+ .getSplitsPartitionedTopic(anyInt(), any(), any(), any(),
any());
int partitions =
partitionedTopicsToPartitions.get(topicName.toString());
@@ -148,7 +158,7 @@ public void testPartitionedTopic() throws Exception {
totalSize += pulsarSplit.getSplitSize();
}
- Assert.assertEquals(totalSize,
topicsToEntries.get(topicName.getSchemaName()).intValue());
+ Assert.assertEquals(totalSize,
topicsToNumEntries.get(topicName.getSchemaName()).intValue());
}
cleanup();
@@ -165,5 +175,126 @@ public boolean test(PulsarSplit pulsarSplit) {
}
}).collect(Collectors.toList());
}
+
+ @Test
+ public void testPublishTimePredicatePushdown() throws Exception {
+
+ TopicName topicName = TOPIC_1;
+
+ setup();
+ log.info("!----- topic: %s -----!", topicName);
+ PulsarTableHandle pulsarTableHandle = new
PulsarTableHandle(pulsarConnectorId.toString(),
+ topicName.getNamespace(),
+ topicName.getLocalName(),
+ topicName.getLocalName());
+
+
+ Map<ColumnHandle, Domain> domainMap = new HashMap<>();
+ Domain domain =
Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE,
packDateTimeWithZone
+ (currentTimeMs + 1L, TimeZoneKey.UTC_KEY), true,
packDateTimeWithZone(currentTimeMs + 50L,
+ TimeZoneKey.UTC_KEY), true)), false);
+
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(),
false), domain);
+ TupleDomain<ColumnHandle> tupleDomain =
TupleDomain.withColumnDomains(domainMap);
+
+ PulsarTableLayoutHandle pulsarTableLayoutHandle = new
PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain);
+
+ final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new
ResultCaptor<>();
+
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(),
any(), any(), any
+ (), any());
+
+ ConnectorSplitSource connectorSplitSource =
this.pulsarSplitManager.getSplits(
+ mock(ConnectorTransactionHandle.class),
mock(ConnectorSession.class),
+ pulsarTableLayoutHandle, null);
+
+
+ verify(this.pulsarSplitManager, times(1))
+ .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(),
any());
+
+ int totalSize = 0;
+ int initalStart = 1;
+ for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
+ Assert.assertEquals(pulsarSplit.getConnectorId(),
pulsarConnectorId.toString());
+ Assert.assertEquals(pulsarSplit.getSchemaName(),
topicName.getNamespace());
+ Assert.assertEquals(pulsarSplit.getTableName(),
topicName.getLocalName());
+ Assert.assertEquals(pulsarSplit.getSchema(),
+ new
String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
+ Assert.assertEquals(pulsarSplit.getSchemaType(),
topicsToSchemas.get(topicName.getSchemaName()).getType());
+ Assert.assertEquals(pulsarSplit.getStartPositionEntryId(),
initalStart);
+ Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+ Assert.assertEquals(pulsarSplit.getStartPosition(),
PositionImpl.get(0, initalStart));
+ Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+ Assert.assertEquals(pulsarSplit.getEndPositionEntryId(),
initalStart + pulsarSplit.getSplitSize());
+ Assert.assertEquals(pulsarSplit.getEndPosition(),
PositionImpl.get(0, initalStart + pulsarSplit
+ .getSplitSize()));
+
+ initalStart += pulsarSplit.getSplitSize();
+ totalSize += pulsarSplit.getSplitSize();
+ }
+ Assert.assertEquals(totalSize, 49);
+
+ }
+
+ @Test
+ public void testPublishTimePredicatePushdownPartitionedTopic() throws
Exception {
+
+ TopicName topicName = PARTITIONED_TOPIC_1;
+
+ setup();
+ log.info("!----- topic: %s -----!", topicName);
+ PulsarTableHandle pulsarTableHandle = new
PulsarTableHandle(pulsarConnectorId.toString(),
+ topicName.getNamespace(),
+ topicName.getLocalName(),
+ topicName.getLocalName());
+
+
+ Map<ColumnHandle, Domain> domainMap = new HashMap<>();
+ Domain domain =
Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE,
packDateTimeWithZone
+ (currentTimeMs + 1L, TimeZoneKey.UTC_KEY), true,
packDateTimeWithZone(currentTimeMs + 50L,
+ TimeZoneKey.UTC_KEY), true)), false);
+
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(),
false), domain);
+ TupleDomain<ColumnHandle> tupleDomain =
TupleDomain.withColumnDomains(domainMap);
+
+ PulsarTableLayoutHandle pulsarTableLayoutHandle = new
PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain);
+
+ final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new
ResultCaptor<>();
+ doAnswer(resultCaptor).when(this.pulsarSplitManager)
+ .getSplitsPartitionedTopic(anyInt(), any(), any(), any(),
any());
+
+ ConnectorSplitSource connectorSplitSource =
this.pulsarSplitManager.getSplits(
+ mock(ConnectorTransactionHandle.class),
mock(ConnectorSession.class),
+ pulsarTableLayoutHandle, null);
+
+
+ verify(this.pulsarSplitManager, times(1))
+ .getSplitsPartitionedTopic(anyInt(), any(), any(), any(),
any());
+
+
+ int partitions =
partitionedTopicsToPartitions.get(topicName.toString());
+ for (int i = 0; i < partitions; i++) {
+ List<PulsarSplit> splits =
getSplitsForPartition(topicName.getPartition(i), resultCaptor.getResult());
+ int totalSize = 0;
+ int initalStart = 1;
+ for (PulsarSplit pulsarSplit : splits) {
+ Assert.assertEquals(pulsarSplit.getConnectorId(),
pulsarConnectorId.toString());
+ Assert.assertEquals(pulsarSplit.getSchemaName(),
topicName.getNamespace());
+ Assert.assertEquals(pulsarSplit.getTableName(),
topicName.getPartition(i).getLocalName());
+ Assert.assertEquals(pulsarSplit.getSchema(),
+ new
String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
+ Assert.assertEquals(pulsarSplit.getSchemaType(),
topicsToSchemas.get(topicName.getSchemaName()).getType());
+ Assert.assertEquals(pulsarSplit.getStartPositionEntryId(),
initalStart);
+ Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+ Assert.assertEquals(pulsarSplit.getStartPosition(),
PositionImpl.get(0, initalStart));
+ Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+ Assert.assertEquals(pulsarSplit.getEndPositionEntryId(),
initalStart + pulsarSplit.getSplitSize());
+ Assert.assertEquals(pulsarSplit.getEndPosition(),
PositionImpl.get(0, initalStart + pulsarSplit.getSplitSize()));
+
+ initalStart += pulsarSplit.getSplitSize();
+ totalSize += pulsarSplit.getSplitSize();
+ }
+
+ Assert.assertEquals(totalSize, 49);
+ }
+ }
+
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services