gianm commented on code in PR #16533:
URL: https://github.com/apache/druid/pull/16533#discussion_r1710181883
##########
processing/src/main/java/org/apache/druid/segment/CursorFactory.java:
##########
@@ -32,35 +33,59 @@
* Interface extended by {@link StorageAdapter}, which gives them the power to
create cursors.
*
* @see StorageAdapter
+ *
+ * @deprecated This interface is deprecated and no longer implemented by any
built-in {@link StorageAdapter}. Callers
+ * should use {@link CursorMakerFactory#makeCursorHolder(CursorBuildSpec)}
instead. Implementors should implement
+ * {@link CursorMakerFactory#makeCursorHolder(CursorBuildSpec)} instead.
*/
+@Deprecated
Review Comment:
I have a feeling we will be evolving the `CursorHolderFactory` interfaces
over the next couple of releases as we build out projections, and possibly
adjust how granularity and sort ordering is handled. So it's IMO early to
deprecate this interface. Extension `Query` and `Segment` can/should keep using
`CursorFactory` until the new interfaces stabilize.
##########
processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java:
##########
@@ -45,26 +42,21 @@ public FilteredStorageAdapter(final StorageAdapter adapter,
final DimFilter filt
}
@Override
- public Sequence<Cursor> makeCursors(
- @Nullable Filter filter,
- Interval interval,
- VirtualColumns virtualColumns,
- Granularity gran,
- boolean descending,
- @Nullable QueryMetrics<?> queryMetrics
- )
+ public CursorHolder makeCursorHolder(CursorBuildSpec spec)
{
- final Filter andFilter;
- if (filter == null) {
+ final CursorBuildSpec.CursorBuildSpecBuilder buildSpecBuilder =
CursorBuildSpec.builder(spec);
+ final Filter newFilter;
+ if (spec.getFilter() == null) {
if (filterOnDataSource != null) {
Review Comment:
Odd that this handles the case where `filterOnDataSource == null`, but the
other branch doesn't. Perhaps it can't be null, and we can simplify this code
by removing the null check.
##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.scan.Order;
+import org.apache.druid.query.scan.OrderBy;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+ public static final CursorBuildSpec FULL_SCAN =
CursorBuildSpec.builder().build();
+
+ public static CursorBuildSpecBuilder builder()
+ {
+ return new CursorBuildSpecBuilder();
+ }
+
+ public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+ {
+ return new CursorBuildSpecBuilder(spec);
+ }
+
+ @Nullable
+ private final Filter filter;
+ private final Interval interval;
+ @Nullable
+ private final List<String> groupingColumns;
+ private final VirtualColumns virtualColumns;
+ @Nullable
+ private final List<AggregatorFactory> aggregators;
+ @Nullable
+ private final List<OrderBy> orderByColumns;
+
+ private final QueryContext queryContext;
+
+ @Nullable
+ private final QueryMetrics<?> queryMetrics;
+
+ public CursorBuildSpec(
+ @Nullable Filter filter,
+ Interval interval,
+ @Nullable List<String> groupingColumns,
+ VirtualColumns virtualColumns,
+ @Nullable List<AggregatorFactory> aggregators,
+ @Nullable List<OrderBy> preferredOrdering,
+ QueryContext queryContext,
+ @Nullable QueryMetrics<?> queryMetrics
+ )
+ {
+ this.filter = filter;
+ this.interval = interval;
+ this.groupingColumns = groupingColumns;
+ this.virtualColumns = virtualColumns;
+ this.aggregators = aggregators;
+ this.orderByColumns = preferredOrdering;
+ this.queryContext = queryContext;
+ this.queryMetrics = queryMetrics;
+ }
+
+ @Nullable
+ public Filter getFilter()
+ {
+ return filter;
+ }
+
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @Nullable
+ public List<String> getGroupingColumns()
+ {
+ return groupingColumns;
+ }
+
+ public VirtualColumns getVirtualColumns()
+ {
+ return virtualColumns;
+ }
+
+ @Nullable
+ public List<AggregatorFactory> getAggregators()
+ {
+ return aggregators;
+ }
+
+ @Nullable
+ public List<OrderBy> getPreferredOrdering()
+ {
+ return orderByColumns;
+ }
+
+ public QueryContext getQueryContext()
+ {
+ return queryContext;
+ }
+
+ @Nullable
+ public QueryMetrics<?> getQueryMetrics()
+ {
+ return queryMetrics;
+ }
+
+ public static boolean preferDescendingTimeOrder(@Nullable List<OrderBy>
preferredOrdering)
+ {
+ if (preferredOrdering != null && preferredOrdering.size() == 1) {
Review Comment:
This returns `false` for a preferred ordering that starts with `__time` but
also has some other stuff after it. Is that good?
##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.scan.Order;
+import org.apache.druid.query.scan.OrderBy;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+ public static final CursorBuildSpec FULL_SCAN =
CursorBuildSpec.builder().build();
+
+ public static CursorBuildSpecBuilder builder()
+ {
+ return new CursorBuildSpecBuilder();
+ }
+
+ public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+ {
+ return new CursorBuildSpecBuilder(spec);
+ }
+
+ @Nullable
+ private final Filter filter;
+ private final Interval interval;
+ @Nullable
+ private final List<String> groupingColumns;
+ private final VirtualColumns virtualColumns;
+ @Nullable
+ private final List<AggregatorFactory> aggregators;
+ @Nullable
+ private final List<OrderBy> orderByColumns;
+
+ private final QueryContext queryContext;
+
+ @Nullable
+ private final QueryMetrics<?> queryMetrics;
+
+ public CursorBuildSpec(
+ @Nullable Filter filter,
+ Interval interval,
+ @Nullable List<String> groupingColumns,
+ VirtualColumns virtualColumns,
+ @Nullable List<AggregatorFactory> aggregators,
+ @Nullable List<OrderBy> preferredOrdering,
+ QueryContext queryContext,
+ @Nullable QueryMetrics<?> queryMetrics
+ )
+ {
+ this.filter = filter;
+ this.interval = interval;
+ this.groupingColumns = groupingColumns;
+ this.virtualColumns = virtualColumns;
+ this.aggregators = aggregators;
+ this.orderByColumns = preferredOrdering;
+ this.queryContext = queryContext;
+ this.queryMetrics = queryMetrics;
+ }
+
+ @Nullable
+ public Filter getFilter()
+ {
+ return filter;
+ }
+
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @Nullable
+ public List<String> getGroupingColumns()
+ {
+ return groupingColumns;
+ }
+
+ public VirtualColumns getVirtualColumns()
+ {
+ return virtualColumns;
+ }
+
+ @Nullable
+ public List<AggregatorFactory> getAggregators()
+ {
+ return aggregators;
+ }
+
+ @Nullable
+ public List<OrderBy> getPreferredOrdering()
+ {
+ return orderByColumns;
+ }
+
+ public QueryContext getQueryContext()
+ {
+ return queryContext;
+ }
+
+ @Nullable
+ public QueryMetrics<?> getQueryMetrics()
+ {
+ return queryMetrics;
+ }
+
+ public static boolean preferDescendingTimeOrder(@Nullable List<OrderBy>
preferredOrdering)
+ {
+ if (preferredOrdering != null && preferredOrdering.size() == 1) {
+ final OrderBy orderBy = Iterables.getOnlyElement(preferredOrdering);
+ return ColumnHolder.TIME_COLUMN_NAME.equals(orderBy.getColumnName()) &&
Order.DESCENDING == orderBy.getOrder();
+ }
+ return false;
+ }
+
+ public static class CursorBuildSpecBuilder
+ {
+ @Nullable
+ private Filter filter;
+ private Interval interval = Intervals.ETERNITY;
+
+ @Nullable
+ private List<String> groupingColumns;
+ private VirtualColumns virtualColumns = VirtualColumns.EMPTY;
+ @Nullable
+ private List<AggregatorFactory> aggregators;
+ @Nullable
+ private List<OrderBy> preferredOrdering;
+
+ private QueryContext queryContext = QueryContext.empty();
+ @Nullable
+ private QueryMetrics<?> queryMetrics;
+
+ private CursorBuildSpecBuilder()
+ {
+ // initialize with defaults
+ }
+
+ private CursorBuildSpecBuilder(CursorBuildSpec buildSpec)
+ {
+ this.filter = buildSpec.filter;
+ this.interval = buildSpec.interval;
+ this.groupingColumns = buildSpec.groupingColumns;
+ this.virtualColumns = buildSpec.virtualColumns;
+ this.aggregators = buildSpec.aggregators;
+ this.preferredOrdering = buildSpec.orderByColumns;
+ this.queryContext = buildSpec.queryContext;
+ this.queryMetrics = buildSpec.queryMetrics;
+ }
+
+ public CursorBuildSpecBuilder setFilter(@Nullable Filter filter)
+ {
+ this.filter = filter;
+ return this;
+ }
+
+ public CursorBuildSpecBuilder setInterval(Interval interval)
+ {
+ this.interval = interval;
+ return this;
+ }
+
+ public CursorBuildSpecBuilder setGroupingAndVirtualColumns(
+ Granularity granularity,
+ @Nullable List<String> groupingColumns,
+ VirtualColumns virtualColumns
+ )
+ {
+ final VirtualColumn granularityVirtual =
Granularities.toVirtualColumn(granularity);
Review Comment:
It seems a little odd that the builder is deciding what the name and
specific contents of the virtual column will be.
I think we can leave this for now, I'm happy enough that the
`CursorBuildSpec` itself doesn't have a `granularity` field. But IMO it'd make
more sense to push the `granularity` all the way to the query engines, so they
build the virtual column rather than the `CursorBuildSpecBuilder` doing it. In
tandem with that we should also do this stuff:
- Adjust the `timestamp_floor` implementation to take advantage of runs in
the `__time` column. Rather than doing `granularity.bucketStart` on each row,
it should keep some state about the current bucket start/end and compare the
`__time` to that current start/end.
- Adjust the Granularizer logic to do equality checks on the virtual
granularity column, rather than greater/less-than comparisons. With the virtual
column always reporting the exact start of the bucket, equality is good enough.
That should enable query engines to use the granularity virtual column
approach 100% of the time without suffering perf loss.
We can do all this later though. If you agree, feel free to keep things the
way they are in this patch, just leave a comment here to note the future
change. Also write in the javadoc for this particular method that it's unstable
& currently unused, so callers in extensions should stick to
`setVirtualColumns`. (Which actually does do something today.)
##########
processing/src/test/java/org/apache/druid/frame/segment/FrameStorageAdapterTest.java:
##########
@@ -345,58 +367,46 @@ public void tearDown()
}
@Test
- public void test_makeCursors()
+ public void test_makeCursor()
{
- assertCursorsMatch(
- adapter ->
- adapter.makeCursors(
- filter,
- interval,
- virtualColumns,
- Granularities.ALL, // Frames only support Granularities.ALL:
no point testing the others.
- descending,
- null
- )
- );
+ assertCursorMatch(adapter -> adapter.makeCursorHolder(buildSpec));
}
@Test
public void test_makeVectorCursor()
{
- Assume.assumeTrue(frameAdapter.canVectorize(filter, virtualColumns,
descending));
-
- assertVectorCursorsMatch(
- adapter ->
- adapter.makeVectorCursor(
- filter,
- interval,
- virtualColumns,
- descending,
- VECTOR_SIZE,
- null
- )
- );
+ assertVectorCursorsMatch(adapter -> adapter.makeCursorHolder(buildSpec));
}
- private void assertCursorsMatch(final Function<StorageAdapter,
Sequence<Cursor>> call)
+ private void assertCursorMatch(final Function<StorageAdapter,
CursorHolder> call)
{
final RowSignature signature = frameAdapter.getRowSignature();
- final Sequence<List<Object>> queryableRows =
- call.apply(queryableAdapter).flatMap(cursor ->
FrameTestUtil.readRowsFromCursor(cursor, signature));
- final Sequence<List<Object>> frameRows =
- call.apply(frameAdapter)
- .flatMap(cursor ->
FrameTestUtil.readRowsFromCursor(advanceAndReset(cursor), signature));
- FrameTestUtil.assertRowsEqual(queryableRows, frameRows);
+ try (final CursorHolder queryableMaker = call.apply(queryableAdapter);
+ final CursorHolder frameMaker = call.apply(frameAdapter)) {
+ final Sequence<List<Object>> queryableRows =
+ FrameTestUtil.readRowsFromCursor(queryableMaker.asCursor(),
signature);
+ final Sequence<List<Object>> frameRows =
+ FrameTestUtil.readRowsFromCursor(frameMaker.asCursor(), signature);
Review Comment:
After this change, `advanceAndReset(Cursor)` is no longer used. Is that
intentional? If not, please add it back in; it's meant to be a no-op that helps
test that `reset()` works properly. (Unless you've added some other tests that
exercise `reset()` better.)
##########
processing/src/main/java/org/apache/druid/segment/CursorMakerFactory.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+public interface CursorMakerFactory
Review Comment:
`CursorHolderFactory`
##########
processing/src/main/java/org/apache/druid/segment/RowBasedCursor.java:
##########
@@ -140,4 +148,12 @@ private void advanceToMatchingRow()
rowId++;
}
}
+
+ private void advanceToMatchingMarkRow()
+ {
+ while (!isDone() && rowId < markId) {
Review Comment:
This should be checking `valueMatcher` somehow, right? I think as written,
it will advance `markId` number of rows on the underlying `rowWalker`, without
regard for which ones match the provided filter. (Normally, `rowId` is
incremented once for each row that matches the filter.)
Additionally, this logic creates a requirement that `rowWalker` be
immutable. I don't think that'll always be true. For example, we use this
cursor on top of external data, where re-walking the walker means re-fetching a
file. The file could have changed. We also use it on top of lookups, where
re-walking could get a different set of keys. I am not sure how really to do
sensible marking on data streams like this.
##########
processing/src/main/java/org/apache/druid/query/scan/OrderBy.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.Objects;
+
+public class OrderBy
Review Comment:
Move outside `scan`? Possibly to the `org.apache.druid.query` package.
##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.scan.Order;
+import org.apache.druid.query.scan.OrderBy;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+ public static final CursorBuildSpec FULL_SCAN =
CursorBuildSpec.builder().build();
+
+ public static CursorBuildSpecBuilder builder()
+ {
+ return new CursorBuildSpecBuilder();
+ }
+
+ public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+ {
+ return new CursorBuildSpecBuilder(spec);
+ }
+
+ @Nullable
+ private final Filter filter;
+ private final Interval interval;
+ @Nullable
+ private final List<String> groupingColumns;
+ private final VirtualColumns virtualColumns;
+ @Nullable
+ private final List<AggregatorFactory> aggregators;
+ @Nullable
+ private final List<OrderBy> orderByColumns;
+
+ private final QueryContext queryContext;
+
+ @Nullable
+ private final QueryMetrics<?> queryMetrics;
+
+ public CursorBuildSpec(
+ @Nullable Filter filter,
+ Interval interval,
+ @Nullable List<String> groupingColumns,
+ VirtualColumns virtualColumns,
+ @Nullable List<AggregatorFactory> aggregators,
+ @Nullable List<OrderBy> preferredOrdering,
+ QueryContext queryContext,
+ @Nullable QueryMetrics<?> queryMetrics
+ )
+ {
+ this.filter = filter;
+ this.interval = interval;
+ this.groupingColumns = groupingColumns;
+ this.virtualColumns = virtualColumns;
+ this.aggregators = aggregators;
+ this.orderByColumns = preferredOrdering;
+ this.queryContext = queryContext;
+ this.queryMetrics = queryMetrics;
+ }
+
+ @Nullable
+ public Filter getFilter()
+ {
+ return filter;
+ }
+
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @Nullable
+ public List<String> getGroupingColumns()
+ {
+ return groupingColumns;
+ }
+
+ public VirtualColumns getVirtualColumns()
+ {
+ return virtualColumns;
+ }
+
+ @Nullable
+ public List<AggregatorFactory> getAggregators()
+ {
+ return aggregators;
+ }
+
+ @Nullable
+ public List<OrderBy> getPreferredOrdering()
+ {
+ return orderByColumns;
+ }
+
+ public QueryContext getQueryContext()
+ {
+ return queryContext;
+ }
+
+ @Nullable
+ public QueryMetrics<?> getQueryMetrics()
+ {
+ return queryMetrics;
+ }
+
+ public static boolean preferDescendingTimeOrder(@Nullable List<OrderBy>
preferredOrdering)
Review Comment:
This method could use some javadoc. Everything in this class could, really.
But this one especially, since it's not obvious from the name what it does.
##########
processing/src/main/java/org/apache/druid/segment/join/HashJoinEngine.java:
##########
@@ -250,12 +244,32 @@ public boolean isDoneOrInterrupted()
return isDone() || Thread.currentThread().isInterrupted();
}
+ @Override
+ public void mark()
+ {
+ joinMarkId = joinColumnSelectorFactory.getRowId();
+ }
+
+ @Override
+ public void resetToMark()
+ {
+ leftCursor.reset();
+ joinMatcher.reset();
+ joinColumnSelectorFactory.resetRowId();
+ initialize();
+ while (!isDone() && joinColumnSelectorFactory.getRowId() < joinMarkId)
{
Review Comment:
I wonder if this approach, where we fully reset and advance `leftCursor`,
will work properly on top of mutable `leftCursor` such as
`IncrementalIndexCursor`. Seems like it would be safer to pass through `mark`
and `resetToMark` to the `leftCursor`.
Speaking of mutability, the rhs Joinable could also be mutable, like if it's
a lookup backed by a Kafka topic. We don't snapshot those. That's also
problematic for this logic, since it could make the `rowId` inconsistent
post-reset.
I don't have a good solution in mind for problems like this. The simplest
one I can think of right now is to side-step the problem by making it so not
all cursors support `mark()` (for example, a join cursor might not support it)
& then adjust TopN to use `runWithCardinalityUnknown` if the underlying cursor
doesn't support `mark()` and `params.getNumValuesPerPass()` is less than
`cardinality`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]