This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-20015 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit d962696caf0fe25c6b2304db7e8459929c98061e Author: amashenkov <[email protected]> AuthorDate: Wed Aug 23 16:33:12 2023 +0300 Fix conflicts. --- .../sql/engine/exec/DestinationFactory.java | 12 +- .../sql/engine/exec/LogicalRelImplementor.java | 3 +- .../sql/engine/trait/DistributionFunction.java | 31 +---- .../ignite/internal/sql/engine/trait/Identity.java | 6 +- .../IdentityDistributionFunctionSelfTest.java | 123 ------------------- .../exec/IdentityDistributionFunctionSelfTest.java | 136 +++++++++++++++++++++ .../planner/ColocatedHashAggregatePlannerTest.java | 3 +- .../planner/ColocatedSortAggregatePlannerTest.java | 3 +- .../planner/MapReduceSortAggregatePlannerTest.java | 3 +- .../sql/engine/planner/SetOpPlannerTest.java | 1 - 10 files changed, 158 insertions(+), 163 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java index ff350add2a..d34dc34b5d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.sql.engine.trait.AllNodes; import org.apache.ignite.internal.sql.engine.trait.Destination; import org.apache.ignite.internal.sql.engine.trait.DistributionFunction; import org.apache.ignite.internal.sql.engine.trait.DistributionFunction.AffinityDistribution; +import org.apache.ignite.internal.sql.engine.trait.Identity; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.trait.Partitioned; import org.apache.ignite.internal.sql.engine.trait.RandomNode; @@ -43,16 +44,19 @@ import org.apache.ignite.internal.util.IgniteUtils; * physical representation - {@link Destination} function. */ class DestinationFactory<RowT> { + private final RowHandler<RowT> rowHandler; private final HashFunctionFactory<RowT> hashFunctionFactory; private final ResolvedDependencies dependencies; /** * Constructor. * + * @param rowHandler Row handler. * @param hashFunctionFactory Hash-function factory required to resolve hash-based distributions. * @param dependencies Dependencies required to resolve row value dependent distributions. */ - DestinationFactory(HashFunctionFactory<RowT> hashFunctionFactory, ResolvedDependencies dependencies) { + DestinationFactory(RowHandler<RowT> rowHandler, HashFunctionFactory<RowT> hashFunctionFactory, ResolvedDependencies dependencies) { + this.rowHandler = rowHandler; this.hashFunctionFactory = hashFunctionFactory; this.dependencies = dependencies; } @@ -83,6 +87,12 @@ class DestinationFactory<RowT> { case HASH_DISTRIBUTED: { ImmutableIntList keys = distribution.getKeys(); + if ("identity".equals(function.name())) { + assert !nullOrEmpty(group.nodeNames()) && !nullOrEmpty(keys) && keys.size() == 1; + + return new Identity<>(rowHandler, keys.get(0), group.nodeNames()); + } + assert !nullOrEmpty(group.assignments()) && !nullOrEmpty(keys); List<List<String>> assignments = Commons.transform(group.assignments(), v -> Commons.transform(v, NodeWithTerm::name)); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java index f6743ac2d5..4cce19e8aa 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java @@ -109,7 +109,6 @@ import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.trait.Destination; -import org.apache.ignite.internal.sql.engine.trait.DistributionFunction.IdentityDistribution; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.trait.TraitUtils; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; @@ -157,7 +156,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>> this.resolvedDependencies = resolvedDependencies; expressionFactory = ctx.expressionFactory(); - destinationFactory = new DestinationFactory<>(hashFuncFactory, resolvedDependencies); + destinationFactory = new DestinationFactory<>(ctx.rowHandler(), hashFuncFactory, resolvedDependencies); } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java index 3aa65f11cb..340c304b9c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java @@ -21,13 +21,6 @@ import java.util.Objects; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelDistribution.Type; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.util.ImmutableIntList; -import org.apache.ignite.internal.sql.engine.exec.RowHandler; -import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup; -import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm; -import org.apache.ignite.internal.sql.engine.util.Commons; -import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory; -import org.apache.ignite.internal.util.IgniteUtils; /** * Distribution function. @@ -225,7 +218,7 @@ public abstract class DistributionFunction { } /** - * Distribution function, which treats column value as a destination. + * Distribution function, which treats column's raw value as a destination. */ public static final class IdentityDistribution extends DistributionFunction { public static final DistributionFunction INSTANCE = new IdentityDistribution(); @@ -241,27 +234,5 @@ public abstract class DistributionFunction { protected String name0() { return "identity"; } - - /** {@inheritDoc} */ - @Override - <RowT> Destination<RowT> destination(HashFunctionFactory<RowT> hashFuncFactory, ColocationGroup group, ImmutableIntList keys) { - //TODO: IGNITE-20246 Fix method signature. RowHandler should be here instead of HashFunctionFactory. - throw new UnsupportedOperationException("Not implemented yet."); - } - - /** - * Creates a destination based on column raw value, given nodes mapping and given distribution keys. - * - * @param rowHandler Handler to access row values. - * @param group Target mapping. - * @param keys Distribution keys. Single key is expected. - * @return Destination function. - */ - public <RowT> Destination<RowT> destination(RowHandler<RowT> rowHandler, ColocationGroup group, ImmutableIntList keys) { - assert keys.size() == 1; - int key = keys.getInt(0); - - return new Identity(rowHandler, key, group.nodeNames()); - } } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Identity.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Identity.java index adb68ac79d..3558580c8d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Identity.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Identity.java @@ -45,10 +45,10 @@ public class Identity<RowT> implements Destination<RowT> { /** {@inheritDoc} */ @Override public List<String> targets(RowT row) { - String node = (String) rowHandler.get(columnIndex, row); + Object node = rowHandler.get(columnIndex, row); - if (nodes.contains(node)) { - return List.of(node); + if (node instanceof String && nodes.contains(node)) { + return List.of((String) node); } throw new IllegalStateException("No target found."); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IdentityDistributionFunctionSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IdentityDistributionFunctionSelfTest.java deleted file mode 100644 index b223dcfb3f..0000000000 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IdentityDistributionFunctionSelfTest.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.sql.engine; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; - -import java.util.List; -import java.util.Map; -import org.apache.calcite.rel.RelDistribution.Type; -import org.apache.calcite.util.ImmutableIntList; -import org.apache.calcite.util.mapping.Mappings; -import org.apache.ignite.internal.schema.NativeTypes; -import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler; -import org.apache.ignite.internal.sql.engine.exec.RowHandler; -import org.apache.ignite.internal.sql.engine.exec.row.RowSchema; -import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup; -import org.apache.ignite.internal.sql.engine.trait.Destination; -import org.apache.ignite.internal.sql.engine.trait.DistributionFunction.IdentityDistribution; -import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; -import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; -import org.hamcrest.Matchers; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -/** - * Identity distribution function self test. - */ -public class IdentityDistributionFunctionSelfTest { - - private static final String NODE_1 = "node1"; - private static final String NODE_2 = "node2"; - private static final String NODE_3 = "node3"; - - private final RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE; - - private final RowSchema rowSchema = RowSchema.builder() - .addField(NativeTypes.STRING) - .addField(NativeTypes.STRING) - .build(); - - private final ColocationGroup colocationGroup = ColocationGroup.forNodes(List.of(NODE_1, NODE_2, NODE_3)); - - @Test - public void distributionFunction() { - IdentityDistribution function = new IdentityDistribution(); - - assertThat(function.type(), equalTo(Type.HASH_DISTRIBUTED)); - assertThat(function.affinity(), is(false)); - assertThat(function.name(), equalTo("identity")); - - Destination<Object[]> destination = function.destination(rowHandler, colocationGroup, ImmutableIntList.of(0)); - - assertThat(destination.targets().size(), equalTo(3)); - assertThat(destination.targets(), containsInAnyOrder(NODE_1, NODE_2, NODE_3)); - } - - @Disabled("https://issues.apache.org/jira/browse/IGNITE-20246") - @Test - public void distributionTrait() { - Object[] row = rowHandler.factory(rowSchema).create(NODE_1, "Ne prikhodya v soznanie"); - Object[] otherRow = rowHandler.factory(rowSchema).create("UNKNOWN", NODE_2); - - IgniteDistribution distribution = IgniteDistributions.identity(0); - Destination<Object> destination = distribution.destination(null, colocationGroup); - - // Valid row. - assertThat(destination.targets(row).size(), equalTo(1)); - assertThat(destination.targets(row), Matchers.contains(NODE_1)); - - // Invalid row. - assertThat(destination.targets(otherRow), empty()); - - // Apply mapping to function to satisfy otherRow. - distribution = distribution.apply(Mappings.target(Map.of(0, 1, 1, 0), 2, 2)); - destination = distribution.destination(null, colocationGroup); - - assertThat(destination.targets(otherRow).size(), equalTo(1)); - assertThat(destination.targets(otherRow), Matchers.contains(NODE_2)); - - } - - @Test - public void destination() { - Object[] row = rowHandler.factory(rowSchema).create(NODE_1, "Ne prikhodya v soznanie"); - Object[] otherRow = rowHandler.factory(rowSchema).create("UNKNOWN", NODE_2); - - IdentityDistribution function = new IdentityDistribution(); - Destination<Object[]> destination = function.destination(rowHandler, colocationGroup, ImmutableIntList.of(0)); - - // Valid row. - assertThat(destination.targets(row).size(), equalTo(1)); - assertThat(destination.targets(row), Matchers.contains(NODE_1)); - - // Invalid row. - Assertions.assertThrows(IllegalStateException.class, () -> destination.targets(otherRow)); - - // Apply mapping to function to satisfy otherRow. - Destination<Object[]> remappedDestination = function.destination(rowHandler, colocationGroup, ImmutableIntList.of(1)); - - assertThat(remappedDestination.targets(otherRow).size(), equalTo(1)); - assertThat(remappedDestination.targets(otherRow), Matchers.contains(NODE_2)); - } -} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java new file mode 100644 index 0000000000..2b46f736b7 --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +import java.util.Collection; +import java.util.List; +import org.apache.calcite.rel.RelDistribution.Type; +import org.apache.ignite.internal.schema.NativeTypes; +import org.apache.ignite.internal.sql.engine.exec.row.RowSchema; +import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup; +import org.apache.ignite.internal.sql.engine.trait.Destination; +import org.apache.ignite.internal.sql.engine.trait.DistributionFunction.IdentityDistribution; +import org.apache.ignite.internal.sql.engine.trait.Identity; +import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Identity distribution function self test. + */ +public class IdentityDistributionFunctionSelfTest { + + private static final String NODE_1 = "node1"; + private static final String NODE_2 = "node2"; + private static final String NODE_3 = "node3"; + + private final RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE; + + private final RowSchema rowSchema = RowSchema.builder() + .addField(NativeTypes.STRING) + .addField(NativeTypes.STRING) + .build(); + + private final ColocationGroup colocationGroup = ColocationGroup.forNodes(List.of(NODE_1, NODE_2, NODE_3)); + private final DestinationFactory<Object[]> destinationFactory = new DestinationFactory<>(rowHandler, null, null); + + @Test + public void identityDistributionTrait() { + IdentityDistribution function = new IdentityDistribution(); + + assertThat(function.type(), equalTo(Type.HASH_DISTRIBUTED)); + assertThat(function.affinity(), is(false)); + assertThat(function.name(), equalTo("identity")); + } + + @Test + public void destinationTargets() { + Destination<Object[]> destination = destinationFactory.createDestination(IgniteDistributions.identity(0), colocationGroup); + + assertThat(destination, instanceOf(Identity.class)); + + Collection<String> targets = destination.targets(); + + assertThat(targets.size(), equalTo(3)); + assertThat(targets, containsInAnyOrder(NODE_1, NODE_2, NODE_3)); + } + + @Test + public void destinationRowTargets() { + Object[] row = rowHandler.factory(rowSchema).create(NODE_1, "Ne prikhodya v soznanie"); + + Destination<Object[]> destination0 = destinationFactory.createDestination(IgniteDistributions.identity(0), colocationGroup); + Destination<Object[]> destination1 = destinationFactory.createDestination(IgniteDistributions.identity(1), colocationGroup); + + assertThat(destination0, instanceOf(Identity.class)); + + // Valid row has single target. + Collection<String> targets = destination0.targets(row); + + assertThat(targets.size(), equalTo(1)); + assertThat(targets, Matchers.contains(NODE_1)); + + // Validate column mapping + Object[] otherRow = rowHandler.factory(rowSchema).create("UNKNOWN", NODE_2); + targets = destination1.targets(otherRow); + + assertThat(targets.size(), equalTo(1)); + assertThat(targets, Matchers.contains(NODE_2)); + + // Check wrong mapping. + Assertions.assertThrows(IllegalStateException.class, () -> destination0.targets(otherRow)); + } + + @Test + public void destinationForInvalidRow() { + Destination<Object[]> destination = destinationFactory.createDestination(IgniteDistributions.identity(0), colocationGroup); + + Object[] invalidRow1 = rowHandler.factory(rowSchema).create("UNKNOWN", NODE_2); + Assertions.assertThrows(IllegalStateException.class, () -> destination.targets(invalidRow1)); + + Object[] invalidRow2 = rowHandler.factory(rowSchema).create("", NODE_2); + Assertions.assertThrows(IllegalStateException.class, () -> destination.targets(invalidRow2)); + + Object[] invalidRow3 = rowHandler.factory(rowSchema).create(null, NODE_2); + Assertions.assertThrows(IllegalStateException.class, () -> destination.targets(invalidRow3)); + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java index 54b09ca0a1..54fe13c0bb 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java @@ -37,7 +37,8 @@ import org.apache.ignite.internal.util.ArrayUtils; import org.junit.jupiter.api.Test; /** - * This test verifies that queries defined in {@link TestCase TestCase} can be optimized with usage of colocated hash aggregates only. + * This test verifies that queries defined in {@link TestCase TestCase} can be optimized with usage of + * colocated hash aggregates only. * * <p>See {@link AbstractAggregatePlannerTest base class} for more details. */ diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java index 64ab03edc1..4c66544ec5 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java @@ -37,7 +37,8 @@ import org.apache.ignite.internal.util.ArrayUtils; import org.junit.jupiter.api.Test; /** - * This test verifies that queries defined in {@link TestCase TestCase} can be optimized with usage of colocated sort aggregates only. + * This test verifies that queries defined in {@link TestCase TestCase} can be optimized with usage of + * colocated sort aggregates only. * * <p>See {@link AbstractAggregatePlannerTest base class} for more details. */ diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java index d0538762f4..a02267ab11 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java @@ -42,7 +42,8 @@ import org.apache.ignite.internal.util.ArrayUtils; import org.junit.jupiter.api.Test; /** - * This test verifies that queries defined in {@link TestCase TestCase} can be optimized with usage of 2 phase sort aggregates only. + * This test verifies that queries defined in {@link TestCase TestCase} can be optimized with usage of + * 2 phase sort aggregates only. * * <p>See {@link AbstractAggregatePlannerTest base class} for more details. */ diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java index 2c2a0318b4..519f8b7dad 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java @@ -520,7 +520,6 @@ public class SetOpPlannerTest extends AbstractPlannerTest { ); } - /** * Tests two SET operations (with ALL flag enabled for the first one) on tables with affinity and random distribution. *
