This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d5c265d933 IGNITE-19770 Add a mechanism to wait till a schema is
available via Schema Sync at a ts (#2402)
d5c265d933 is described below
commit d5c265d9335fed1ab424ac4a4021c380f24d6503
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Aug 7 07:56:24 2023 +0400
IGNITE-19770 Add a mechanism to wait till a schema is available via Schema
Sync at a ts (#2402)
---
.github/PULL_REQUEST_TEMPLATE.md | 12 ++--
.../ignite/internal/catalog/CatalogService.java | 8 +--
.../ignite/internal/hlc/HybridTimestamp.java | 19 +++--
.../ignite/internal/hlc/HybridTimestampTest.java | 80 +++++++++++++++++++++
.../distributed/schema/SchemaSyncService.java | 44 ++++++++++++
.../distributed/schema/SchemaSyncServiceImpl.java | 54 ++++++++++++++
.../schema/SchemaSyncServiceImplTest.java | 84 ++++++++++++++++++++++
7 files changed, 287 insertions(+), 14 deletions(-)
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index e31ce5da18..0434205a85 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -1,19 +1,19 @@
Thank you for submitting the pull request.
-In order to streamline the review of the patch and ensure better code quality
-we ask both an author and a reviewer to check the following requirements:
+To streamline the review process of the patch and ensure better code quality
+we ask both an author and a reviewer to verify the following:
### The Review Checklist
-- [ ] **Formal criteria:** TC status, codestyle, mandatory documentation. Also
check the following list:
+- [ ] **Formal criteria:** TC status, codestyle, mandatory documentation. Also
make sure to complete the following:
\- There is a single JIRA ticket related to the pull request.
\- The web-link to the pull request is attached to the JIRA ticket.
\- The JIRA ticket has the Patch Available state.
-\- The description of the JIRA ticket explains WHAT and WHY was made and HOW.
+\- The description of the JIRA ticket explains WHAT was made, WHY and HOW.
\- The pull request title is treated as the final commit message. The
following pattern must be used: IGNITE-XXXX Change summary where XXXX - number
of JIRA issue.
- [ ] **Design:** new code conforms with the design principles of the
components it is added to.
-- [ ] **Patch quality:** patch cannot be split into a smaller pieces, its size
is reasonable.
+- [ ] **Patch quality:** patch cannot be split into smaller pieces, its size
must be reasonable.
- [ ] **Code quality:** code is clean and readable, necessary developer
documentation is added if needed.
-- [ ] **Tests code quality:** tests set covers positive/negative scenarios,
happy/edge cases. Tests are effective in terms of execution time and resources.
+- [ ] **Tests code quality:** test set covers positive/negative scenarios,
happy/edge cases. Tests are effective in terms of execution time and resources.
### Notes
- [Apache Ignite Coding
Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Java+Code+Style+Guide)
\ No newline at end of file
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index c1d30f1b4c..66535869ba 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -57,17 +57,17 @@ public interface CatalogService {
Collection<CatalogIndexDescriptor> indexes(int catalogVersion);
- CatalogSchemaDescriptor schema(int version);
+ @Nullable CatalogSchemaDescriptor schema(int version);
- CatalogSchemaDescriptor schema(@Nullable String schemaName, int version);
+ @Nullable CatalogSchemaDescriptor schema(@Nullable String schemaName, int
version);
CatalogZoneDescriptor zone(String zoneName, long timestamp);
CatalogZoneDescriptor zone(int zoneId, long timestamp);
- CatalogSchemaDescriptor activeSchema(long timestamp);
+ @Nullable CatalogSchemaDescriptor activeSchema(long timestamp);
- CatalogSchemaDescriptor activeSchema(@Nullable String schemaName, long
timestamp);
+ @Nullable CatalogSchemaDescriptor activeSchema(@Nullable String
schemaName, long timestamp);
int activeCatalogVersion(long timestamp);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index 07563b3438..4889133ffb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -236,12 +236,23 @@ public final class HybridTimestamp implements
Comparable<HybridTimestamp>, Seria
/**
* Returns a new hybrid timestamp with incremented physical component.
*/
- public HybridTimestamp addPhysicalTime(long mills) {
- if (mills >= (1L << PHYSICAL_TIME_BITS_SIZE)) {
- throw new IllegalArgumentException("Physical time is out of
bounds: " + mills);
+ public HybridTimestamp addPhysicalTime(long millis) {
+ if (millis >= (1L << PHYSICAL_TIME_BITS_SIZE)) {
+ throw new IllegalArgumentException("Physical time is out of
bounds: " + millis);
}
- return new HybridTimestamp(time + (mills << LOGICAL_TIME_BITS_SIZE));
+ return new HybridTimestamp(time + (millis << LOGICAL_TIME_BITS_SIZE));
+ }
+
+ /**
+ * Returns a new hybrid timestamp with decremented physical component.
+ */
+ public HybridTimestamp subtractPhysicalTime(long millis) {
+ if (millis >= (1L << PHYSICAL_TIME_BITS_SIZE)) {
+ throw new IllegalArgumentException("Physical time is out of
bounds: " + millis);
+ }
+
+ return new HybridTimestamp(time - (millis << LOGICAL_TIME_BITS_SIZE));
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridTimestampTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridTimestampTest.java
index 9a58e801cb..de2602c30c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridTimestampTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridTimestampTest.java
@@ -17,11 +17,15 @@
package org.apache.ignite.internal.hlc;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.PHYSICAL_TIME_BITS_SIZE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.max;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import org.junit.jupiter.api.Test;
@@ -64,4 +68,80 @@ class HybridTimestampTest {
void hashCodeSameWhenComponentsAreSame() {
assertEquals(new HybridTimestamp(1, 2).hashCode(), new
HybridTimestamp(1, 2).hashCode());
}
+
+ @Test
+ void addPhysicalTimeIncrementsPhysicalComponent() {
+ HybridTimestamp before = new HybridTimestamp(1, 2);
+
+ HybridTimestamp after = before.addPhysicalTime(1000);
+
+ assertThat(after.getPhysical(), is(1001L));
+ }
+
+ @Test
+ void addPhysicalTimeLeavesLogicalComponentIntact() {
+ HybridTimestamp before = new HybridTimestamp(1, 2);
+
+ HybridTimestamp after = before.addPhysicalTime(1000);
+
+ assertThat(after.getLogical(), is(2));
+ }
+
+ @Test
+ void addPhysicalTimeThrowsIfIncrementIsTooBig() {
+ HybridTimestamp before = new HybridTimestamp(1, 2);
+
+ IllegalArgumentException ex = assertThrows(
+ IllegalArgumentException.class,
+ () -> before.addPhysicalTime(1L << PHYSICAL_TIME_BITS_SIZE)
+ );
+ assertThat(ex.getMessage(), is("Physical time is out of bounds:
281474976710656"));
+ }
+
+ @Test
+ void addPhysicalTimeThrowsIfOverflowHappens() {
+ IllegalArgumentException ex = assertThrows(
+ IllegalArgumentException.class,
+ () -> HybridTimestamp.MAX_VALUE.addPhysicalTime(1)
+ );
+ assertThat(ex.getMessage(), startsWith("Time is out of bounds: "));
+ }
+
+ @Test
+ void subtractPhysicalTimeDecrementsPhysicalComponent() {
+ HybridTimestamp before = new HybridTimestamp(2001, 2);
+
+ HybridTimestamp after = before.subtractPhysicalTime(1000);
+
+ assertThat(after.getPhysical(), is(1001L));
+ }
+
+ @Test
+ void subtractPhysicalTimeLeavesLogicalComponentIntact() {
+ HybridTimestamp before = new HybridTimestamp(2001, 2);
+
+ HybridTimestamp after = before.subtractPhysicalTime(1000);
+
+ assertThat(after.getLogical(), is(2));
+ }
+
+ @Test
+ void subtractPhysicalTimeThrowsIfDecrementIsTooBig() {
+ HybridTimestamp before = new HybridTimestamp(2001, 2);
+
+ IllegalArgumentException ex = assertThrows(
+ IllegalArgumentException.class,
+ () -> before.addPhysicalTime(1L << PHYSICAL_TIME_BITS_SIZE)
+ );
+ assertThat(ex.getMessage(), is("Physical time is out of bounds:
281474976710656"));
+ }
+
+ @Test
+ void subtractPhysicalTimeThrowsIfUnderflowHappens() {
+ IllegalArgumentException ex = assertThrows(
+ IllegalArgumentException.class,
+ () -> HybridTimestamp.MIN_VALUE.subtractPhysicalTime(1_000_000)
+ );
+ assertThat(ex.getMessage(), startsWith("Time is out of bounds: "));
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
new file mode 100644
index 0000000000..3dc90b84e6
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * Implements Schema Synchronization wait logic as defined in IEP-98.
+ */
+public interface SchemaSyncService {
+ /**
+ * Waits till metadata (like table/index schemas) is complete for the
given timestamp. The 'complete' here means
+ * that no metadata change can arrive later that would change how a
table/index/etc looks at the given timestamp.
+ *
+ * @param ts Timestamp of interest.
+ * @return Future that completes when it is safe to query the Catalog at
the given timestamp (as its data will
+ * remain unchanged for the timestamp).
+ */
+ CompletableFuture<Void> waitForMetadataCompleteness(HybridTimestamp ts);
+
+ /**
+ * Returns {@code true} if we already have metadata (like table/index
schemas) for the given catalog version.
+ *
+ * @param catalogVersion Version of the Catalog to check.
+ * @return {@code true} if we already have metadata (like table/index
schemas) for the given catalog version.
+ */
+ boolean isMetadataAvailableFor(int catalogVersion);
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
new file mode 100644
index 0000000000..91d7a38017
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+
+/**
+ * A default implementation of {@link SchemaSyncService}.
+ */
+public class SchemaSyncServiceImpl implements SchemaSyncService {
+ private final ClusterTime clusterTime;
+
+ private final CatalogService catalogService;
+
+ private final LongSupplier delayDurationMs;
+
+ /**
+ * Constructor.
+ */
+ public SchemaSyncServiceImpl(ClusterTime clusterTime, CatalogService
catalogService, LongSupplier delayDurationMs) {
+ this.clusterTime = clusterTime;
+ this.catalogService = catalogService;
+ this.delayDurationMs = delayDurationMs;
+ }
+
+ @Override
+ public CompletableFuture<Void> waitForMetadataCompleteness(HybridTimestamp
ts) {
+ return
clusterTime.waitFor(ts.subtractPhysicalTime(delayDurationMs.getAsLong()));
+ }
+
+ @Override
+ public boolean isMetadataAvailableFor(int catalogVersion) {
+ return catalogVersion <= catalogService.latestCatalogVersion();
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
new file mode 100644
index 0000000000..0cfc5472d6
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SchemaSyncServiceImplTest {
+ private static final long DELAY_DURATION = 500;
+
+ @Mock
+ private ClusterTime clusterTime;
+
+ @Mock
+ private CatalogService catalogService;
+
+ private final LongSupplier delayDurationMs = () -> DELAY_DURATION;
+
+ private SchemaSyncServiceImpl schemaSyncService;
+
+ private final HybridClock clock = new HybridClockImpl();
+
+ @BeforeEach
+ void createSchemaSyncService() {
+ schemaSyncService = new SchemaSyncServiceImpl(clusterTime,
catalogService, delayDurationMs);
+ }
+
+ @Test
+ void waitsTillSchemaCompletenessSubtractingDelayDuration() {
+ HybridTimestamp ts = clock.now();
+ CompletableFuture<Void> clusterTimeFuture = new CompletableFuture<>();
+
+ HybridTimestamp tsMinusDelayDuration =
ts.subtractPhysicalTime(delayDurationMs.getAsLong());
+
when(clusterTime.waitFor(tsMinusDelayDuration)).thenReturn(clusterTimeFuture);
+
+ CompletableFuture<Void> waitFuture =
schemaSyncService.waitForMetadataCompleteness(ts);
+
+ assertThat(waitFuture, is(not(completedFuture())));
+
+ clusterTimeFuture.complete(null);
+ assertThat(waitFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void isMetadataAvailableForConsultsCatalogService() {
+ when(catalogService.latestCatalogVersion()).thenReturn(5);
+
+ assertThat(schemaSyncService.isMetadataAvailableFor(5), is(true));
+ assertThat(schemaSyncService.isMetadataAvailableFor(6), is(false));
+ }
+}