This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 29afe19116b IGNITE-27479 System View. Introduce late materialization 
for row from system views (#7333)
29afe19116b is described below

commit 29afe19116b1965442e997899a2b7b75da9837f7
Author: korlov42 <[email protected]>
AuthorDate: Wed Dec 31 09:46:25 2025 +0200

    IGNITE-27479 System View. Introduce late materialization for row from 
system views (#7333)
---
 .../systemviews/TablesSystemViewProvider.java      |  23 +-
 .../ignite/internal/util/FlatteningIterator.java   |  86 +++++++
 .../internal/util/FlatteningIteratorTest.java      | 225 ++++++++++++++++++
 .../internal/benchmark/SystemViewsBenchmark.java   | 156 ++++++++++++
 .../internal/systemview/SystemViewManagerImpl.java | 261 +++++++++++++++++++--
 5 files changed, 723 insertions(+), 28 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/systemviews/TablesSystemViewProvider.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/systemviews/TablesSystemViewProvider.java
index 228023600dd..25088dc1f98 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/systemviews/TablesSystemViewProvider.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/systemviews/TablesSystemViewProvider.java
@@ -31,7 +31,9 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescript
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.systemview.api.SystemView;
 import org.apache.ignite.internal.systemview.api.SystemViews;
+import org.apache.ignite.internal.util.FlatteningIterator;
 import org.apache.ignite.internal.util.SubscriptionUtils;
+import org.apache.ignite.internal.util.TransformingIterator;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -99,16 +101,19 @@ public class TablesSystemViewProvider implements 
CatalogSystemViewProvider {
         Iterable<ColumnMetadata> viewData = () -> {
             Catalog catalog = catalogSupplier.get();
 
-            return catalog.tables().stream()
-                    .flatMap(table -> table.columns().stream()
-                            .map(columnDescriptor -> new ColumnMetadata(
-                                            
catalog.schema(table.schemaId()).name(),
-                                            table,
-                                            columnDescriptor
-                                    )
-                            )
+            return new FlatteningIterator<>(
+                    new TransformingIterator<>(
+                            catalog.tables().iterator(),
+                            table -> {
+                                String schemaName = 
catalog.schema(table.schemaId()).name();
+
+                                return TransformingIterator.newIterable(
+                                        table.columns(),
+                                        column -> new 
ColumnMetadata(schemaName, table, column)
+                                );
+                            }
                     )
-                    .iterator();
+            );
         };
 
         Publisher<ColumnMetadata> viewDataPublisher = 
SubscriptionUtils.fromIterable(viewData);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/FlatteningIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/FlatteningIterator.java
new file mode 100644
index 00000000000..3a5cadae765
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/FlatteningIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An iterator which implements FLAT MAP operation on the input.
+ */
+public class FlatteningIterator<T> implements Iterator<T>, AutoCloseable {
+    private final Iterator<? extends Iterable<T>> source;
+
+    private @Nullable Iterator<T> current;
+
+    /** Constructs the object. */
+    public FlatteningIterator(Iterator<? extends Iterable<T>> source) {
+        this.source = source;
+    }
+
+    @Override
+    public boolean hasNext() {
+        do {
+            if (current != null) {
+                if (current.hasNext()) {
+                    return true;
+                }
+
+                // current is completely drained, reset.
+                current = null;
+            }
+
+            if (!source.hasNext()) {
+                return false;
+            }
+
+            current = source.next().iterator();
+        } while (true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public T next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        assert current != null;
+
+        return current.next();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void remove() {
+        if (current == null) {
+            throw new IllegalStateException();
+        }
+
+        current.remove();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() throws Exception {
+        if (source instanceof AutoCloseable) {
+            ((AutoCloseable) source).close();
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/FlatteningIteratorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/FlatteningIteratorTest.java
new file mode 100644
index 00000000000..db6f5902da1
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/FlatteningIteratorTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.junit.jupiter.api.Test;
+
+/** Set of tests to verify {@link FlatteningIterator}. */
+@SuppressWarnings("resource")
+class FlatteningIteratorTest {
+    @Test
+    void testBasicFlattening() {
+        List<List<Integer>> input = List.of(
+                List.of(1, 2, 3),
+                List.of(4, 5),
+                List.of(6, 7, 8, 9)
+        );
+
+        FlatteningIterator<Integer> iterator = new 
FlatteningIterator<>(input.iterator());
+
+        List<Integer> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+
+        assertEquals(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9), result);
+    }
+
+    @Test
+    void testEmptySource() {
+        List<List<Integer>> input = List.of();
+
+        @SuppressWarnings("RedundantOperationOnEmptyContainer")
+        FlatteningIterator<Integer> iterator = new 
FlatteningIterator<>(input.iterator());
+
+        assertFalse(iterator.hasNext());
+        assertThrows(NoSuchElementException.class, iterator::next);
+    }
+
+    @Test
+    void testAllEmptyIterables() {
+        List<List<Integer>> input = List.of(
+                List.of(),
+                List.of(),
+                List.of()
+        );
+
+        FlatteningIterator<Integer> iterator = new 
FlatteningIterator<>(input.iterator());
+
+        assertFalse(iterator.hasNext());
+        assertThrows(NoSuchElementException.class, iterator::next);
+    }
+
+    @Test
+    void testMixedEmptyAndNonEmpty() {
+        List<List<Integer>> input = List.of(
+                List.of(),
+                List.of(1, 2),
+                List.of(),
+                List.of(3),
+                List.of()
+        );
+
+        FlatteningIterator<Integer> iterator = new 
FlatteningIterator<>(input.iterator());
+
+        List<Integer> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+
+        assertEquals(List.of(1, 2, 3), result);
+    }
+
+    @Test
+    void testSingleElementSingleIterable() {
+        List<List<String>> input = List.of(
+                List.of("hello")
+        );
+
+        FlatteningIterator<String> iterator = new 
FlatteningIterator<>(input.iterator());
+
+        assertTrue(iterator.hasNext());
+        assertEquals("hello", iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    void testExhaustedIterator() {
+        List<List<Integer>> input = List.of(List.of(1, 2));
+
+        FlatteningIterator<Integer> iterator = new 
FlatteningIterator<>(input.iterator());
+
+        iterator.next(); // 1
+        iterator.next(); // 2
+
+        assertFalse(iterator.hasNext());
+        assertThrows(NoSuchElementException.class, iterator::next);
+    }
+
+    @Test
+    void testMultipleHasNextCalls() {
+        List<List<Integer>> input = List.of(List.of(1, 2));
+
+        FlatteningIterator<Integer> iterator = new 
FlatteningIterator<>(input.iterator());
+
+        assertTrue(iterator.hasNext());
+        assertTrue(iterator.hasNext());
+        assertTrue(iterator.hasNext());
+
+        assertEquals(1, iterator.next());
+
+        assertTrue(iterator.hasNext());
+        assertTrue(iterator.hasNext());
+
+        assertEquals(2, iterator.next());
+    }
+
+    @Test
+    void testWithStrings() {
+        List<List<String>> input = List.of(
+                List.of("a", "b"),
+                List.of("c", "d", "e")
+        );
+
+        FlatteningIterator<String> iterator = new 
FlatteningIterator<>(input.iterator());
+
+        List<String> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+
+        assertEquals(List.of("a", "b", "c", "d", "e"), result);
+    }
+
+    @Test
+    void testAutoCloseable() {
+        class ClosableIterator<T> implements Iterator<T>, AutoCloseable {
+            private final Iterator<T> delegate;
+            private boolean closeCalled;
+
+            private ClosableIterator(Iterator<T> delegate) {
+                this.delegate = delegate;
+            }
+
+            @Override
+            public void close() {
+                closeCalled = true;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return delegate.hasNext();
+            }
+
+            @Override
+            public T next() {
+                return delegate.next();
+            }
+        }
+
+        ClosableIterator<List<Integer>> input = new ClosableIterator<>(List.of(
+                List.of(1, 2, 3)
+        ).iterator());
+
+        assertDoesNotThrow(() -> {
+            try (FlatteningIterator<Integer> iterator = new 
FlatteningIterator<>(input)) {
+                iterator.next();
+            }
+        });
+        assertTrue(input.closeCalled);
+    }
+
+    @Test
+    void testEmptyIterableAtBeginning() {
+        List<List<Integer>> input = List.of(
+                List.of(),
+                List.of(1, 2, 3)
+        );
+
+        FlatteningIterator<Integer> iterator = new 
FlatteningIterator<>(input.iterator());
+
+        assertTrue(iterator.hasNext());
+        assertEquals(1, iterator.next());
+    }
+
+    @Test
+    void testEmptyIterableAtEnd() {
+        List<List<Integer>> input = List.of(
+                List.of(1, 2, 3),
+                List.of()
+        );
+
+        FlatteningIterator<Integer> iterator = new 
FlatteningIterator<>(input.iterator());
+
+        iterator.next(); // 1
+        iterator.next(); // 2
+        iterator.next(); // 3
+
+        assertFalse(iterator.hasNext());
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SystemViewsBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SystemViewsBenchmark.java
new file mode 100644
index 00000000000..d2ea9ef91a2
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SystemViewsBenchmark.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.sql.IgniteSql;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark that runs sql queries over system views to measure its 
performance.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@SuppressWarnings({"WeakerAccess", "unused"})
+public class SystemViewsBenchmark extends AbstractMultiNodeBenchmark {
+    private IgniteSql sql;
+
+    @Param("3")
+    private int clusterSize;
+
+    /** Fills the table with data. */
+    @Setup
+    public void setUp() throws IOException {
+        sql = publicIgnite.sql();
+    }
+
+    /** Benchmark to measure performance of queries over TABLE-COLUMNS system 
view. */
+    @Benchmark
+    @OutputTimeUnit(TimeUnit.MICROSECONDS)
+    public void getPrimaryKeys(FiveHundredTablesState state, Blackhole bh) {
+        try (var rs = sql.execute(null, "SELECT column_name\n"
+                + "FROM SYSTEM.table_columns\n"
+                + "WHERE table_name = ?\n"
+                + "AND pk_column_ordinal IS NOT NULL;", state.tableName())) {
+            while (rs.hasNext()) {
+                bh.consume(rs.next());
+            }
+        }
+    }
+
+    /** Benchmark to measure performance of queries over TABLE-COLUMNS system 
view. */
+    @Benchmark
+    @OutputTimeUnit(TimeUnit.MICROSECONDS)
+    public void getColumnsType(FiveHundredTablesState state, Blackhole bh) {
+        try (var rs = sql.execute(null, "SELECT column_name,column_type\n"
+                + "FROM SYSTEM.table_columns\n"
+                + "WHERE table_name = ?;", state.tableName())) {
+            while (rs.hasNext()) {
+                bh.consume(rs.next());
+            }
+        }
+    }
+
+    /** State that creates 500 tables with 10 columns each and 2 columns 
primary key. */
+    @State(Scope.Benchmark)
+    public static class FiveHundredTablesState {
+        private static final int TABLES_COUNT = 500;
+
+        /** Creates necessary tables. */
+        @Setup
+        public void setUp() throws IOException {
+            IgniteSql sql = publicIgnite.sql();
+
+            int columnsCount = 10;
+
+            StringBuilder scriptBuilder = new StringBuilder(
+                    "CREATE ZONE my_zone (PARTITIONS 1, REPLICAS 1) STORAGE 
PROFILES ['default'];"
+            ).append("ALTER ZONE my_zone SET DEFAULT;");
+
+            for (int t = 0; t < TABLES_COUNT; t++) {
+                scriptBuilder.append("CREATE TABLE 
my_table_").append(t).append("(");
+
+                for (int c = 0; c < columnsCount; c++) {
+                    if (c > 0) {
+                        scriptBuilder.append(", ");
+                    }
+
+                    scriptBuilder.append("c_").append(c).append(" INT");
+                }
+
+                scriptBuilder.append(", PRIMARY KEY (c_1, c_2));");
+            }
+
+            sql.executeScript(scriptBuilder.toString());
+        }
+
+        @SuppressWarnings("MethodMayBeStatic")
+        String tableName() {
+            return "MY_TABLE_" + 
ThreadLocalRandom.current().nextInt(TABLES_COUNT);
+        }
+    }
+
+    /**
+     * Benchmark's entry point.
+     */
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                .include(".*" + SystemViewsBenchmark.class.getSimpleName() + 
".*")
+                .build();
+
+        new Runner(opt).run();
+    }
+
+    @Override
+    protected void createDistributionZoneOnStartup() {
+        // NO-OP
+    }
+
+    @Override
+    protected void createTablesOnStartup() {
+        // NO-OP
+    }
+
+    @Override
+    protected int nodes() {
+        return clusterSize;
+    }
+}
diff --git 
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
 
b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
index cbf25438ce2..a873afaba6a 100644
--- 
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
+++ 
b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
@@ -23,12 +23,22 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,17 +56,16 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.InternalTuple;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.systemview.api.NodeSystemView;
 import org.apache.ignite.internal.systemview.api.SystemView;
-import org.apache.ignite.internal.systemview.api.SystemViewColumn;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
 import org.apache.ignite.internal.systemview.api.SystemViewProvider;
 import org.apache.ignite.internal.systemview.utils.SystemViewUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.subscription.TransformingPublisher;
 import org.apache.ignite.lang.ErrorGroups.Common;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * SQL system views manager implementation.
@@ -246,8 +255,14 @@ public class SystemViewManagerImpl implements 
SystemViewManager, NodeAttributesP
     private static Map<String, ScannableView<?>> toScannableViews(String 
localNodeName, Map<String, SystemView<?>> views) {
         Map<String, ScannableView<?>> scannableViews = new HashMap<>();
 
+        ViewRowFactory nodeViewRowFactory = new 
NodeViewRowFactory(localNodeName);
+
         for (SystemView<?> view : views.values()) {
-            scannableViews.put(view.name(), new ScannableView<>(localNodeName, 
(SystemView<Object>) view));
+            ViewRowFactory rowFactory = view instanceof NodeSystemView
+                    ? nodeViewRowFactory
+                    : ClusterViewRowFactory.INSTANCE;
+
+            scannableViews.put(view.name(), new ScannableView<>(rowFactory, 
(SystemView<Object>) view));
         }
 
         return Map.copyOf(scannableViews);
@@ -256,30 +271,238 @@ public class SystemViewManagerImpl implements 
SystemViewManager, NodeAttributesP
     private static class ScannableView<T> {
         private final Publisher<InternalTuple> publisher;
 
-        private ScannableView(String localNodeName, SystemView<T> view) {
+        private ScannableView(ViewRowFactory rowFactory, SystemView<T> view) {
             BinaryTupleSchema schema = tupleSchemaForView(view);
+            this.publisher = new TransformingPublisher<>(view.dataProvider(), 
object -> rowFactory.create(schema, view, object));
+        }
 
-            this.publisher = new TransformingPublisher<>(view.dataProvider(), 
object -> {
-                BinaryTupleBuilder builder = new 
BinaryTupleBuilder(schema.elementCount());
+        Publisher<InternalTuple> scan() {
+            return publisher;
+        }
+    }
 
-                int offset = 0;
-                if (view instanceof NodeSystemView) {
-                    builder.appendString(localNodeName);
-                    offset++;
-                }
+    private abstract static class ViewRowFactory {
+        abstract <ViewSourceT> InternalTuple create(BinaryTupleSchema schema, 
SystemView<ViewSourceT> view, ViewSourceT source);
+    }
 
-                for (int i = 0; i < view.columns().size(); i++) {
-                    SystemViewColumn<T, ?> column = view.columns().get(i);
+    private static class NodeViewRowFactory extends ViewRowFactory {
+        private final String nodeName;
 
-                    schema.appendValue(builder, i + offset, 
column.value().apply(object));
-                }
+        private NodeViewRowFactory(String nodeName) {
+            this.nodeName = nodeName;
+        }
 
-                return new BinaryTuple(schema.elementCount(), builder.build());
-            });
+        @Override
+        <ViewSourceT> InternalTuple create(BinaryTupleSchema schema, 
SystemView<ViewSourceT> view, ViewSourceT source) {
+            return new NodeViewRow<>(schema, nodeName, view, source);
         }
+    }
 
-        Publisher<InternalTuple> scan() {
-            return publisher;
+    private static class ClusterViewRowFactory extends ViewRowFactory {
+        private static final ViewRowFactory INSTANCE = new 
ClusterViewRowFactory();
+
+        @Override
+        <ViewSourceT> InternalTuple create(BinaryTupleSchema schema, 
SystemView<ViewSourceT> view, ViewSourceT source) {
+            return new ClusterViewRow<>(schema, view, source);
+        }
+    }
+
+    private static class NodeViewRow<T> extends AbstractViewRow {
+        private final String nodeName;
+        private final SystemView<T> view;
+        private final T source;
+
+        private NodeViewRow(BinaryTupleSchema schema, String nodeName, 
SystemView<T> view, T source) {
+            super(schema);
+
+            this.nodeName = nodeName;
+            this.view = view;
+            this.source = source;
+        }
+
+        @Override
+        public int elementCount() {
+            return view.columns().size() + 1;
+        }
+
+        @Override
+        <ReturnT> ReturnT value(int columnIndex) {
+            return columnIndex == 0 
+                    ? (ReturnT) nodeName
+                    : (ReturnT) view.columns().get(columnIndex - 
1).value().apply(source);
+        }
+    }
+
+    private static class ClusterViewRow<T> extends AbstractViewRow {
+        private final SystemView<T> view;
+        private final T source;
+
+        private ClusterViewRow(BinaryTupleSchema schema, SystemView<T> view, T 
source) {
+            super(schema);
+            this.view = view;
+            this.source = source;
+        }
+
+        @Override
+        public int elementCount() {
+            return view.columns().size();
+        }
+
+        @Override
+        <ReturnT> ReturnT value(int columnIndex) {
+            return (ReturnT) 
view.columns().get(columnIndex).value().apply(source);
+        }
+    }
+
+    private abstract static class AbstractViewRow implements InternalTuple {
+        private final BinaryTupleSchema schema;
+
+        private AbstractViewRow(BinaryTupleSchema schema) {
+            this.schema = schema;
+        }
+
+        abstract <T> T value(int columnIndex);
+
+        @Override
+        public boolean hasNullValue(int columnIndex) {
+            return value(columnIndex) == null;
+        }
+
+        @Override
+        public boolean booleanValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public Boolean booleanValueBoxed(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public byte byteValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public Byte byteValueBoxed(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public short shortValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public Short shortValueBoxed(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public int intValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public Integer intValueBoxed(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public long longValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public Long longValueBoxed(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public float floatValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public Float floatValueBoxed(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public double doubleValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public Double doubleValueBoxed(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public @Nullable BigDecimal decimalValue(int columnIndex, int scale) {
+            BigDecimal value = value(columnIndex);
+
+            if (value == null) {
+                return null;
+            }
+
+            return value.setScale(scale, RoundingMode.UNNECESSARY);
+        }
+
+        @Override
+        public String stringValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public byte[] bytesValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public UUID uuidValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public LocalDate dateValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public LocalTime timeValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public LocalDateTime dateTimeValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public Instant timestampValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public Period periodValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public Duration durationValue(int columnIndex) {
+            return value(columnIndex);
+        }
+
+        @Override
+        public ByteBuffer byteBuffer() {
+            BinaryTupleBuilder builder = new 
BinaryTupleBuilder(schema.elementCount());
+
+            for (int i = 0; i < schema.elementCount(); i++) {
+                schema.appendValue(builder, i, value(i));
+            }
+
+            return builder.build();
         }
     }
 }

Reply via email to