This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 48270e3e2d7d48bdfce4be1caadda1e7f85000a4 Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Jun 2 17:52:12 2021 +0200 [FLINK-21356][state/changelog] Implement checkpointing using changelog Both materialized and non-materialized state are checkpointed. But materialization and recovery from changelog will be implemented in subsequent commits. For now, - changelog grows indefinitely (not truncated) - state changes are not applied on recovery, tests fail with incorrect results --- .../metadata/MetadataV2V3SerializerBase.java | 38 ++++++ .../changelog/ChangelogStateBackendHandle.java | 143 +++++++++++++++++++++ .../inmemory/InMemoryStateChangelogHandle.java | 2 +- .../inmemory/InMemoryStateChangelogWriter.java | 11 +- .../changelog/ChangelogKeyedStateBackend.java | 140 +++++++++++++++++++- .../state/changelog/ChangelogStateBackend.java | 21 ++- 6 files changed, 343 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index 0ea7337..1be1fa5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -38,7 +38,9 @@ import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandle; import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogHandle; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; @@ -99,6 +101,7 @@ public abstract class MetadataV2V3SerializerBase { private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5; private static final byte RELATIVE_STREAM_STATE_HANDLE = 6; private static final byte SAVEPOINT_KEY_GROUPS_HANDLE = 7; + private static final byte CHANGELOG_HANDLE = 8; private static final byte CHANGELOG_BYTE_INCREMENT_HANDLE = 9; private static final byte CHANGELOG_FILE_INCREMENT_HANDLE = 10; @@ -316,6 +319,23 @@ public abstract class MetadataV2V3SerializerBase { serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos); serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos); + } else if (stateHandle instanceof ChangelogStateBackendHandle) { + ChangelogStateBackendHandle handle = (ChangelogStateBackendHandle) stateHandle; + + dos.writeByte(CHANGELOG_HANDLE); + + dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup()); + dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups()); + + dos.writeInt(handle.getMaterializedStateHandles().size()); + for (KeyedStateHandle k : handle.getMaterializedStateHandles()) { + serializeKeyedStateHandle(k, dos); + } + + dos.writeInt(handle.getNonMaterializedStateHandles().size()); + for (KeyedStateHandle k : handle.getNonMaterializedStateHandles()) { + serializeKeyedStateHandle(k, dos); + } } else if (stateHandle instanceof InMemoryStateChangelogHandle) { InMemoryStateChangelogHandle handle = (InMemoryStateChangelogHandle) stateHandle; @@ -406,6 +426,24 @@ public abstract class MetadataV2V3SerializerBase { sharedStates, privateStates, metaDataStateHandle); + } else if (CHANGELOG_HANDLE == type) { + + int startKeyGroup = dis.readInt(); + int numKeyGroups = dis.readInt(); + KeyGroupRange keyGroupRange = + KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); + int baseSize = dis.readInt(); + List<KeyedStateHandle> base = new ArrayList<>(baseSize); + for (int i = 0; i < baseSize; i++) { + base.add(deserializeKeyedStateHandle(dis, context)); + } + int deltaSize = dis.readInt(); + List<StateChangelogHandle> delta = new ArrayList<>(deltaSize); + for (int i = 0; i < deltaSize; i++) { + delta.add((StateChangelogHandle) deserializeKeyedStateHandle(dis, context)); + } + return new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl( + base, delta, keyGroupRange); } else if (CHANGELOG_BYTE_INCREMENT_HANDLE == type) { int start = dis.readInt(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java new file mode 100644 index 0000000..7940210 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.changelog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.guava18.com.google.common.io.Closer; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static java.util.Collections.unmodifiableList; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A handle to ChangelogStateBackend state. Consists of the base and delta parts. Base part + * references materialized state (e.g. SST files), while delta part references state changes that + * were not not materialized at the time of the snapshot. Both are potentially empty lists as there + * can be no state or multiple states (e.g. after rescaling). + */ +@Internal +public interface ChangelogStateBackendHandle extends KeyedStateHandle { + List<KeyedStateHandle> getMaterializedStateHandles(); + + List<StateChangelogHandle> getNonMaterializedStateHandles(); + + class ChangelogStateBackendHandleImpl implements ChangelogStateBackendHandle { + private static final long serialVersionUID = 1L; + private final List<KeyedStateHandle> materialized; + private final List<StateChangelogHandle> nonMaterialized; + private final KeyGroupRange keyGroupRange; + + public ChangelogStateBackendHandleImpl( + List<KeyedStateHandle> materialized, + List<StateChangelogHandle> nonMaterialized, + KeyGroupRange keyGroupRange) { + this.materialized = unmodifiableList(materialized); + this.nonMaterialized = unmodifiableList(nonMaterialized); + this.keyGroupRange = keyGroupRange; + checkArgument(keyGroupRange.getNumberOfKeyGroups() > 0); + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry) { + stateRegistry.registerAll(materialized); + stateRegistry.registerAll(nonMaterialized); + } + + @Override + public void discardState() throws Exception { + try (Closer closer = Closer.create()) { + materialized.forEach(h -> closer.register(asCloseable(h))); + nonMaterialized.forEach(h -> closer.register(asCloseable(h))); + } + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + @Nullable + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + // todo: revisit/review + KeyGroupRange intersection = this.keyGroupRange.getIntersection(keyGroupRange); + if (intersection.getNumberOfKeyGroups() == 0) { + return null; + } + List<KeyedStateHandle> basePart = + this.materialized.stream() + .map(handle -> handle.getIntersection(keyGroupRange)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + List<StateChangelogHandle> deltaPart = + this.nonMaterialized.stream() + .map( + handle -> + (StateChangelogHandle) + handle.getIntersection(keyGroupRange)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + return new ChangelogStateBackendHandleImpl(basePart, deltaPart, intersection); + } + + @Override + public long getStateSize() { + return materialized.stream().mapToLong(StateObject::getStateSize).sum() + + nonMaterialized.stream().mapToLong(StateObject::getStateSize).sum(); + } + + @Override + public List<KeyedStateHandle> getMaterializedStateHandles() { + return materialized; + } + + @Override + public List<StateChangelogHandle> getNonMaterializedStateHandles() { + return nonMaterialized; + } + + @Override + public String toString() { + return String.format( + "keyGroupRange=%s, basePartSize=%d, deltaPartSize=%d", + keyGroupRange, materialized.size(), nonMaterialized.size()); + } + + private static Closeable asCloseable(KeyedStateHandle h) { + return () -> { + try { + h.discardState(); + } catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + }; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java index f492cad..8735f98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java @@ -84,7 +84,7 @@ public class InMemoryStateChangelogHandle implements StateChangelogHandle { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { - throw new UnsupportedOperationException(); + // do nothing } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java index c63fb93..3c15984 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java @@ -104,16 +104,13 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryState @Override public void truncate(SequenceNumber before) { - changesByKeyGroup.forEach((k, v) -> {}); + changesByKeyGroup.forEach( + (kg, changesBySqn) -> changesBySqn.headMap(before, false).clear()); } @Override - public void confirm(SequenceNumber from, SequenceNumber to) { - throw new UnsupportedOperationException(); - } + public void confirm(SequenceNumber from, SequenceNumber to) {} @Override - public void reset(SequenceNumber from, SequenceNumber to) { - throw new UnsupportedOperationException(); - } + public void reset(SequenceNumber from, SequenceNumber to) {} } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index abe7568..7293c7b 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -44,6 +44,9 @@ import org.apache.flink.runtime.state.SavepointResources; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.TestableKeyedStateBackend; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogHandle; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.runtime.state.internal.InternalKvState; @@ -52,12 +55,23 @@ import org.apache.flink.runtime.state.ttl.TtlStateFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.FlinkRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -74,6 +88,7 @@ class ChangelogKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend<K> { + private static final Logger LOG = LoggerFactory.getLogger(ChangelogKeyedStateBackend.class); private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = Stream.of( @@ -109,6 +124,8 @@ class ChangelogKeyedStateBackend<K> private final StateChangelogWriter<?> stateChangelogWriter; + private long lastCheckpointId = -1L; + /** last accessed partitioned state. */ @SuppressWarnings("rawtypes") private InternalKvState lastState; @@ -116,6 +133,39 @@ class ChangelogKeyedStateBackend<K> /** For caching the last accessed partitioned state. */ private String lastName; + /** Updated initially on restore and later upon materialization (in FLINK-21357). */ + @GuardedBy("materialized") + private final List<KeyedStateHandle> materialized = new ArrayList<>(); + + /** Updated initially on restore and later cleared upon materialization (in FLINK-21357). */ + @GuardedBy("materialized") + private final List<StateChangelogHandle> nonMaterialized = new ArrayList<>(); + + /** + * {@link SequenceNumber} denoting last upload range <b>start</b>, inclusive. Updated to {@link + * #materializedTo} when {@link #snapshot(long, long, CheckpointStreamFactory, + * CheckpointOptions) starting snapshot}. Used to notify {@link #stateChangelogWriter} about + * changelog ranges that were confirmed or aborted by JM. + */ + @Nullable private SequenceNumber lastUploadedFrom; + /** + * {@link SequenceNumber} denoting last upload range <b>end</b>, exclusive. Updated to {@link + * org.apache.flink.runtime.state.changelog.StateChangelogWriter#lastAppendedSequenceNumber} + * when {@link #snapshot(long, long, CheckpointStreamFactory, CheckpointOptions) starting + * snapshot}. Used to notify {@link #stateChangelogWriter} about changelog ranges that were + * confirmed or aborted by JM. + */ + @Nullable private SequenceNumber lastUploadedTo; + /** + * The {@link SequenceNumber} up to which the state is materialized, exclusive. The log should + * be truncated accordingly. + * + * <p>WARN: currently not updated - to be changed in FLINK-21357. + * + * <p>WARN: this value needs to be updated for benchmarking, e.g. in notifyCheckpointComplete. + */ + private final SequenceNumber materializedTo; + public ChangelogKeyedStateBackend( AbstractKeyedStateBackend<K> keyedStateBackend, ExecutionConfig executionConfig, @@ -126,6 +176,7 @@ class ChangelogKeyedStateBackend<K> this.ttlTimeProvider = ttlTimeProvider; this.keyValueStatesByName = new HashMap<>(); this.stateChangelogWriter = stateChangelogWriter; + this.materializedTo = stateChangelogWriter.initialSequenceNumber(); } // -------------------- CheckpointableKeyedStateBackend -------------------------------- @@ -240,8 +291,43 @@ class ChangelogKeyedStateBackend<K> @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { - return keyedStateBackend.snapshot( - checkpointId, timestamp, streamFactory, checkpointOptions); + // The range to upload may overlap with the previous one(s). To reuse them, we could store + // the previous results either here in the backend or in the writer. However, + // materialization may truncate only a part of the previous result and the backend would + // have to split it somehow for the former option, so the latter is used. + lastCheckpointId = checkpointId; + lastUploadedFrom = materializedTo; + lastUploadedTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); + + LOG.debug( + "snapshot for checkpoint {}, change range: {}..{}", + checkpointId, + lastUploadedFrom, + lastUploadedTo); + return toRunnableFuture( + stateChangelogWriter + .persist(lastUploadedFrom) + .thenApply(this::buildSnapshotResult)); + } + + private SnapshotResult<KeyedStateHandle> buildSnapshotResult(StateChangelogHandle delta) { + // Can be called by either task thread during the sync checkpoint phase (if persist future + // was already completed); or by the writer thread otherwise. So need to synchronize. + // todo: revisit after FLINK-21357 - use mailbox action? + synchronized (materialized) { + // collections don't change once started and handles are immutable + List<StateChangelogHandle> prevDeltaCopy = new ArrayList<>(nonMaterialized); + if (delta != null && delta.getStateSize() > 0) { + prevDeltaCopy.add(delta); + } + if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) { + return SnapshotResult.empty(); + } else { + return SnapshotResult.of( + new ChangelogStateBackendHandleImpl( + materialized, prevDeltaCopy, getKeyGroupRange())); + } + } } @Nonnull @@ -283,11 +369,26 @@ class ChangelogKeyedStateBackend<K> // -------------------- CheckpointListener -------------------------------- @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (lastCheckpointId == checkpointId) { + // Notify the writer so that it can re-use the previous uploads. Do NOT notify it about + // a range status change if it is not relevant anymore. Otherwise, it could CONFIRM a + // newer upload instead of the previous one. This newer upload could then be re-used + // while in fact JM has discarded its results. + // This might change if the log ownership changes (the method won't likely be needed). + stateChangelogWriter.confirm(lastUploadedFrom, lastUploadedTo); + } keyedStateBackend.notifyCheckpointComplete(checkpointId); } @Override public void notifyCheckpointAborted(long checkpointId) throws Exception { + if (lastCheckpointId == checkpointId) { + // Notify the writer so that it can clean up. Do NOT notify it about a range status + // change if it is not relevant anymore. Otherwise, it could DISCARD a newer upload + // instead of the previous one. Rely on truncation for the cleanup in this case. + // This might change if the log ownership changes (the method won't likely be needed). + stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo); + } keyedStateBackend.notifyCheckpointAborted(checkpointId); } @@ -372,4 +473,39 @@ class ChangelogKeyedStateBackend<K> InternalKvState<K, N, SV> kvState, KvStateChangeLogger<SV, N> changeLogger) throws Exception; } + + private static <T> RunnableFuture<T> toRunnableFuture(CompletableFuture<T> f) { + return new RunnableFuture<T>() { + @Override + public void run() { + f.join(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return f.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return f.isCancelled(); + } + + @Override + public boolean isDone() { + return f.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return f.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return f.get(timeout, unit); + } + }; + } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java index 36a7a59..d354203 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage; import org.apache.flink.runtime.state.delegate.DelegatingStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; @@ -46,6 +47,8 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.util.Collection; +import java.util.Objects; +import java.util.stream.Collectors; /** * This state backend holds the working state in the underlying delegatedStateBackend, and forwards @@ -105,10 +108,11 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab kvStateRegistry, ttlTimeProvider, metricGroup, - stateHandles, + extractMaterializedState(stateHandles), cancelStreamRegistry); // todo: FLINK-21804 get from Environment.getTaskStateManager InMemoryStateChangelogStorage changelogWriterFactory = new InMemoryStateChangelogStorage(); + // todo: apply state changes from non-materialized part of stateHandles return new ChangelogKeyedStateBackend<>( keyedStateBackend, env.getExecutionConfig(), @@ -144,12 +148,13 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab kvStateRegistry, ttlTimeProvider, metricGroup, - stateHandles, + extractMaterializedState(stateHandles), cancelStreamRegistry, managedMemoryFraction); // todo: FLINK-21804 get from Environment.getTaskStateManager InMemoryStateChangelogStorage changelogWriterFactory = new InMemoryStateChangelogStorage(); + // todo: apply state changes from non-materialized part of stateHandles return new ChangelogKeyedStateBackend<>( keyedStateBackend, env.getExecutionConfig(), @@ -190,4 +195,16 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab return this; } + + private static Collection<KeyedStateHandle> extractMaterializedState( + Collection<KeyedStateHandle> stateHandles) { + return stateHandles.stream() + .flatMap( + keyedStateHandle -> + ((ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl) + keyedStateHandle) + .getMaterializedStateHandles().stream()) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } }
