This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d5c265d933 IGNITE-19770 Add a mechanism to wait till a schema is 
available via Schema Sync at a ts (#2402)
d5c265d933 is described below

commit d5c265d9335fed1ab424ac4a4021c380f24d6503
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Aug 7 07:56:24 2023 +0400

    IGNITE-19770 Add a mechanism to wait till a schema is available via Schema 
Sync at a ts (#2402)
---
 .github/PULL_REQUEST_TEMPLATE.md                   | 12 ++--
 .../ignite/internal/catalog/CatalogService.java    |  8 +--
 .../ignite/internal/hlc/HybridTimestamp.java       | 19 +++--
 .../ignite/internal/hlc/HybridTimestampTest.java   | 80 +++++++++++++++++++++
 .../distributed/schema/SchemaSyncService.java      | 44 ++++++++++++
 .../distributed/schema/SchemaSyncServiceImpl.java  | 54 ++++++++++++++
 .../schema/SchemaSyncServiceImplTest.java          | 84 ++++++++++++++++++++++
 7 files changed, 287 insertions(+), 14 deletions(-)

diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index e31ce5da18..0434205a85 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -1,19 +1,19 @@
 Thank you for submitting the pull request.
 
-In order to streamline the review of the patch and ensure better code quality
-we ask both an author and a reviewer to check the following requirements:
+To streamline the review process of the patch and ensure better code quality
+we ask both an author and a reviewer to verify the following:
 
 ### The Review Checklist
-- [ ] **Formal criteria:** TC status, codestyle, mandatory documentation. Also 
check the following list:  
+- [ ] **Formal criteria:** TC status, codestyle, mandatory documentation. Also 
make sure to complete the following:  
 \- There is a single JIRA ticket related to the pull request.  
 \- The web-link to the pull request is attached to the JIRA ticket.  
 \- The JIRA ticket has the Patch Available state.  
-\- The description of the JIRA ticket explains WHAT and WHY was made and HOW.  
+\- The description of the JIRA ticket explains WHAT was made, WHY and HOW.  
 \- The pull request title is treated as the final commit message. The 
following pattern must be used: IGNITE-XXXX Change summary where XXXX - number 
of JIRA issue.
 - [ ] **Design:** new code conforms with the design principles of the 
components it is added to.
-- [ ] **Patch quality:** patch cannot be split into a smaller pieces, its size 
is reasonable.
+- [ ] **Patch quality:** patch cannot be split into smaller pieces, its size 
must be reasonable.
 - [ ] **Code quality:** code is clean and readable, necessary developer 
documentation is added if needed.
-- [ ] **Tests code quality:** tests set covers positive/negative scenarios, 
happy/edge cases. Tests are effective in terms of execution time and resources.
+- [ ] **Tests code quality:** test set covers positive/negative scenarios, 
happy/edge cases. Tests are effective in terms of execution time and resources.
 
 ### Notes
 - [Apache Ignite Coding 
Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Java+Code+Style+Guide)
\ No newline at end of file
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index c1d30f1b4c..66535869ba 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -57,17 +57,17 @@ public interface CatalogService {
 
     Collection<CatalogIndexDescriptor> indexes(int catalogVersion);
 
-    CatalogSchemaDescriptor schema(int version);
+    @Nullable CatalogSchemaDescriptor schema(int version);
 
-    CatalogSchemaDescriptor schema(@Nullable String schemaName, int version);
+    @Nullable CatalogSchemaDescriptor schema(@Nullable String schemaName, int 
version);
 
     CatalogZoneDescriptor zone(String zoneName, long timestamp);
 
     CatalogZoneDescriptor zone(int zoneId, long timestamp);
 
-    CatalogSchemaDescriptor activeSchema(long timestamp);
+    @Nullable CatalogSchemaDescriptor activeSchema(long timestamp);
 
-    CatalogSchemaDescriptor activeSchema(@Nullable String schemaName, long 
timestamp);
+    @Nullable CatalogSchemaDescriptor activeSchema(@Nullable String 
schemaName, long timestamp);
 
     int activeCatalogVersion(long timestamp);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index 07563b3438..4889133ffb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -236,12 +236,23 @@ public final class HybridTimestamp implements 
Comparable<HybridTimestamp>, Seria
     /**
      * Returns a new hybrid timestamp with incremented physical component.
      */
-    public HybridTimestamp addPhysicalTime(long mills) {
-        if (mills >= (1L << PHYSICAL_TIME_BITS_SIZE)) {
-            throw new IllegalArgumentException("Physical time is out of 
bounds: " + mills);
+    public HybridTimestamp addPhysicalTime(long millis) {
+        if (millis >= (1L << PHYSICAL_TIME_BITS_SIZE)) {
+            throw new IllegalArgumentException("Physical time is out of 
bounds: " + millis);
         }
 
-        return new HybridTimestamp(time + (mills << LOGICAL_TIME_BITS_SIZE));
+        return new HybridTimestamp(time + (millis << LOGICAL_TIME_BITS_SIZE));
+    }
+
+    /**
+     * Returns a new hybrid timestamp with decremented physical component.
+     */
+    public HybridTimestamp subtractPhysicalTime(long millis) {
+        if (millis >= (1L << PHYSICAL_TIME_BITS_SIZE)) {
+            throw new IllegalArgumentException("Physical time is out of 
bounds: " + millis);
+        }
+
+        return new HybridTimestamp(time - (millis << LOGICAL_TIME_BITS_SIZE));
     }
 
     /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridTimestampTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridTimestampTest.java
index 9a58e801cb..de2602c30c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridTimestampTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridTimestampTest.java
@@ -17,11 +17,15 @@
 
 package org.apache.ignite.internal.hlc;
 
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.PHYSICAL_TIME_BITS_SIZE;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.max;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import org.junit.jupiter.api.Test;
 
@@ -64,4 +68,80 @@ class HybridTimestampTest {
     void hashCodeSameWhenComponentsAreSame() {
         assertEquals(new HybridTimestamp(1, 2).hashCode(), new 
HybridTimestamp(1, 2).hashCode());
     }
+
+    @Test
+    void addPhysicalTimeIncrementsPhysicalComponent() {
+        HybridTimestamp before = new HybridTimestamp(1, 2);
+
+        HybridTimestamp after = before.addPhysicalTime(1000);
+
+        assertThat(after.getPhysical(), is(1001L));
+    }
+
+    @Test
+    void addPhysicalTimeLeavesLogicalComponentIntact() {
+        HybridTimestamp before = new HybridTimestamp(1, 2);
+
+        HybridTimestamp after = before.addPhysicalTime(1000);
+
+        assertThat(after.getLogical(), is(2));
+    }
+
+    @Test
+    void addPhysicalTimeThrowsIfIncrementIsTooBig() {
+        HybridTimestamp before = new HybridTimestamp(1, 2);
+
+        IllegalArgumentException ex = assertThrows(
+                IllegalArgumentException.class,
+                () -> before.addPhysicalTime(1L << PHYSICAL_TIME_BITS_SIZE)
+        );
+        assertThat(ex.getMessage(), is("Physical time is out of bounds: 
281474976710656"));
+    }
+
+    @Test
+    void addPhysicalTimeThrowsIfOverflowHappens() {
+        IllegalArgumentException ex = assertThrows(
+                IllegalArgumentException.class,
+                () -> HybridTimestamp.MAX_VALUE.addPhysicalTime(1)
+        );
+        assertThat(ex.getMessage(), startsWith("Time is out of bounds: "));
+    }
+
+    @Test
+    void subtractPhysicalTimeDecrementsPhysicalComponent() {
+        HybridTimestamp before = new HybridTimestamp(2001, 2);
+
+        HybridTimestamp after = before.subtractPhysicalTime(1000);
+
+        assertThat(after.getPhysical(), is(1001L));
+    }
+
+    @Test
+    void subtractPhysicalTimeLeavesLogicalComponentIntact() {
+        HybridTimestamp before = new HybridTimestamp(2001, 2);
+
+        HybridTimestamp after = before.subtractPhysicalTime(1000);
+
+        assertThat(after.getLogical(), is(2));
+    }
+
+    @Test
+    void subtractPhysicalTimeThrowsIfDecrementIsTooBig() {
+        HybridTimestamp before = new HybridTimestamp(2001, 2);
+
+        IllegalArgumentException ex = assertThrows(
+                IllegalArgumentException.class,
+                () -> before.addPhysicalTime(1L << PHYSICAL_TIME_BITS_SIZE)
+        );
+        assertThat(ex.getMessage(), is("Physical time is out of bounds: 
281474976710656"));
+    }
+
+    @Test
+    void subtractPhysicalTimeThrowsIfUnderflowHappens() {
+        IllegalArgumentException ex = assertThrows(
+                IllegalArgumentException.class,
+                () -> HybridTimestamp.MIN_VALUE.subtractPhysicalTime(1_000_000)
+        );
+        assertThat(ex.getMessage(), startsWith("Time is out of bounds: "));
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
new file mode 100644
index 0000000000..3dc90b84e6
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * Implements Schema Synchronization wait logic as defined in IEP-98.
+ */
+public interface SchemaSyncService {
+    /**
+     * Waits till metadata (like table/index schemas) is complete for the 
given timestamp. The 'complete' here means
+     * that no metadata change can arrive later that would change how a 
table/index/etc looks at the given timestamp.
+     *
+     * @param ts Timestamp of interest.
+     * @return Future that completes when it is safe to query the Catalog at 
the given timestamp (as its data will
+     *     remain unchanged for the timestamp).
+     */
+    CompletableFuture<Void> waitForMetadataCompleteness(HybridTimestamp ts);
+
+    /**
+     * Returns {@code true} if we already have metadata (like table/index 
schemas) for the given catalog version.
+     *
+     * @param catalogVersion Version of the Catalog to check.
+     * @return {@code true} if we already have metadata (like table/index 
schemas) for the given catalog version.
+     */
+    boolean isMetadataAvailableFor(int catalogVersion);
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
new file mode 100644
index 0000000000..91d7a38017
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+
+/**
+ * A default implementation of {@link SchemaSyncService}.
+ */
+public class SchemaSyncServiceImpl implements SchemaSyncService {
+    private final ClusterTime clusterTime;
+
+    private final CatalogService catalogService;
+
+    private final LongSupplier delayDurationMs;
+
+    /**
+     * Constructor.
+     */
+    public SchemaSyncServiceImpl(ClusterTime clusterTime, CatalogService 
catalogService, LongSupplier delayDurationMs) {
+        this.clusterTime = clusterTime;
+        this.catalogService = catalogService;
+        this.delayDurationMs = delayDurationMs;
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForMetadataCompleteness(HybridTimestamp 
ts) {
+        return 
clusterTime.waitFor(ts.subtractPhysicalTime(delayDurationMs.getAsLong()));
+    }
+
+    @Override
+    public boolean isMetadataAvailableFor(int catalogVersion) {
+        return catalogVersion <= catalogService.latestCatalogVersion();
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
new file mode 100644
index 0000000000..0cfc5472d6
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SchemaSyncServiceImplTest {
+    private static final long DELAY_DURATION = 500;
+
+    @Mock
+    private ClusterTime clusterTime;
+
+    @Mock
+    private CatalogService catalogService;
+
+    private final LongSupplier delayDurationMs = () -> DELAY_DURATION;
+
+    private SchemaSyncServiceImpl schemaSyncService;
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @BeforeEach
+    void createSchemaSyncService() {
+        schemaSyncService = new SchemaSyncServiceImpl(clusterTime, 
catalogService, delayDurationMs);
+    }
+
+    @Test
+    void waitsTillSchemaCompletenessSubtractingDelayDuration() {
+        HybridTimestamp ts = clock.now();
+        CompletableFuture<Void> clusterTimeFuture = new CompletableFuture<>();
+
+        HybridTimestamp tsMinusDelayDuration = 
ts.subtractPhysicalTime(delayDurationMs.getAsLong());
+        
when(clusterTime.waitFor(tsMinusDelayDuration)).thenReturn(clusterTimeFuture);
+
+        CompletableFuture<Void> waitFuture = 
schemaSyncService.waitForMetadataCompleteness(ts);
+
+        assertThat(waitFuture, is(not(completedFuture())));
+
+        clusterTimeFuture.complete(null);
+        assertThat(waitFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void isMetadataAvailableForConsultsCatalogService() {
+        when(catalogService.latestCatalogVersion()).thenReturn(5);
+
+        assertThat(schemaSyncService.isMetadataAvailableFor(5), is(true));
+        assertThat(schemaSyncService.isMetadataAvailableFor(6), is(false));
+    }
+}

Reply via email to