Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2218#discussion_r131126237
--- Diff:
storm-client/src/jvm/org/apache/storm/windowing/persistence/WindowState.java ---
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.windowing.persistence;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.windowing.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper around the window related states that are checkpointed.
+ */
+public class WindowState<T> extends AbstractCollection<Event<T>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(WindowState.class);
+
+ // number of events per window-partition
+ public static final int MAX_PARTITION_EVENTS = 1000;
+ public static final int MIN_PARTITIONS = 10;
+ private static final String PARTITION_IDS_KEY = "pk";
+ private final KeyValueState<String, Deque<Long>> partitionIdsState;
+ private final KeyValueState<Long, WindowPartition<T>>
windowPartitionsState;
+ private final KeyValueState<String, Optional<?>> windowSystemState;
+ // ordered partition keys
+ private volatile Deque<Long> partitionIds;
+ private volatile long latestPartitionId;
+ private volatile WindowPartition<T> latestPartition;
+ private volatile WindowPartitionCache<Long, WindowPartition<T>> cache;
+ private Supplier<Map<String, Optional<?>>> windowSystemStateSupplier;
+ private final ReentrantLock partitionIdsLock = new ReentrantLock(true);
+ private final WindowPartitionLock windowPartitionsLock = new
WindowPartitionLock();
+ private final long maxEventsInMemory;
+ private Set<Long> iteratorPins = new HashSet<>();
+
+ public WindowState(KeyValueState<Long, WindowPartition<T>>
windowPartitionsState,
+ KeyValueState<String, Deque<Long>> partitionIdsState,
+ KeyValueState<String, Optional<?>> windowSystemState,
+ Supplier<Map<String, Optional<?>>>
windowSystemStateSupplier,
+ long maxEventsInMemory) {
+ this.windowPartitionsState = windowPartitionsState;
+ this.partitionIdsState = partitionIdsState;
+ this.windowSystemState = windowSystemState;
+ this.windowSystemStateSupplier = windowSystemStateSupplier;
+ this.maxEventsInMemory = Math.max(MAX_PARTITION_EVENTS *
MIN_PARTITIONS, maxEventsInMemory);
+ init();
+ }
+
+ @Override
+ public boolean add(Event<T> event) {
+ if (latestPartition.size() >= MAX_PARTITION_EVENTS) {
+ cache.unpin(latestPartition.getId());
+ latestPartition = getPinnedPartition(getNextPartitionId());
+ }
+ latestPartition.add(event);
+ return true;
+ }
+
+ @Override
+ public Iterator<Event<T>> iterator() {
+
+ return new Iterator<Event<T>>() {
+ private Iterator<Long> ids = getIds();
+ private Iterator<Event<T>> current =
Collections.emptyIterator();
+ private Iterator<Event<T>> removeFrom;
+ private WindowPartition<T> curPartition;
+
+ private Iterator<Long> getIds() {
+ try {
+ partitionIdsLock.lock();
+ LOG.debug("Iterator partitionIds: {}", partitionIds);
+ return new ArrayList<>(partitionIds).iterator();
+ } finally {
+ partitionIdsLock.unlock();
+ }
+ }
+
+ @Override
+ public void remove() {
+ if (removeFrom == null) {
+ throw new IllegalStateException("No calls to next()
since last call to remove()");
+ }
+ removeFrom.remove();
+ removeFrom = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean curHasNext = current.hasNext();
+ while (!curHasNext && ids.hasNext()) {
+ if (curPartition != null) {
+ unpin(curPartition.getId());
+ }
+ curPartition = getPinnedPartition(ids.next());
+ if (curPartition != null) {
+ iteratorPins.add(curPartition.getId());
+ current = curPartition.iterator();
+ curHasNext = current.hasNext();
+ }
+ }
+ // un-pin the last partition
+ if (!curHasNext && curPartition != null) {
+ unpin(curPartition.getId());
+ curPartition = null;
+ }
+ return curHasNext;
+ }
+
+ @Override
+ public Event<T> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ removeFrom = current;
+ return current.next();
+ }
+
+ private void unpin(long id) {
+ cache.unpin(id);
+ iteratorPins.remove(id);
+ }
+ };
+ }
+
+ public void clearIteratorPins() {
+ LOG.debug("clearIteratorPins '{}'", iteratorPins);
+ Iterator<Long> it = iteratorPins.iterator();
+ while (it.hasNext()) {
+ cache.unpin(it.next());
+ it.remove();
+ }
+ }
+
+ @Override
+ public int size() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Prepares the {@link WindowState} for commit.
+ *
+ * @param txid the transaction id
+ */
+ public void prepareCommit(long txid) {
+ flush();
+ partitionIdsState.prepareCommit(txid);
+ windowPartitionsState.prepareCommit(txid);
+ windowSystemState.prepareCommit(txid);
+ }
+
+ /**
+ * Commits the {@link WindowState}.
+ *
+ * @param txid the transaction id
+ */
+ public void commit(long txid) {
+ partitionIdsState.commit(txid);
+ windowPartitionsState.commit(txid);
+ windowSystemState.commit(txid);
+ }
+
+ /**
+ * Rolls back the {@link WindowState}.
+ *
+ * @param reInit if the members should be synced with the values from
the state.
+ */
+ public void rollback(boolean reInit) {
+ partitionIdsState.rollback();
+ windowPartitionsState.rollback();
+ windowSystemState.rollback();
+ // re-init cache and partitions
+ if (reInit) {
+ init();
+ }
+ }
+
+ private void init() {
+ initCache();
+ initPartitions();
+ }
+
+ private void initPartitions() {
+ partitionIds = partitionIdsState.get(PARTITION_IDS_KEY, new
LinkedList<>());
+ if (partitionIds.isEmpty()) {
+ partitionIds.add(0L);
+ partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
+ }
+ latestPartitionId = partitionIds.peekLast();
+ latestPartition = cache.pinAndGet(latestPartitionId);
+ }
+
+ private void initCache() {
+ long size = maxEventsInMemory / MAX_PARTITION_EVENTS;
+ LOG.info("maxEventsInMemory: {}, partition size: {}, number of
partitions: {}",
+ maxEventsInMemory, MAX_PARTITION_EVENTS, size);
+ cache = SimpleWindowPartitionCache.<Long,
WindowPartition<T>>newBuilder()
+ .maximumSize(size)
+ .removalListener(new
WindowPartitionCache.RemovalListener<Long, WindowPartition<T>>() {
+ @Override
+ public void onRemoval(Long pid, WindowPartition<T> p,
WindowPartitionCache.RemovalCause removalCause) {
+ Objects.requireNonNull(pid, "Null partition id");
+ Objects.requireNonNull(p, "Null window partition");
+ LOG.debug("onRemoval for id '{}', WindowPartition
'{}'", pid, p);
+ try {
+ windowPartitionsLock.lock(pid);
+ if (p.isEmpty() && pid != latestPartitionId) {
+ // if the empty partition was not invalidated
by flush, but evicted from cache
+ if (removalCause !=
WindowPartitionCache.RemovalCause.EXPLICIT) {
+ deletePartition(pid);
+ windowPartitionsState.delete(pid);
+ }
+ } else if (p.isModified()) {
+ windowPartitionsState.put(pid, p);
+ } else {
+ LOG.debug("WindowPartition '{}' is not
modified", pid);
+ }
+ } finally {
+ windowPartitionsLock.unlock(pid);
+ }
+ }
+ }).build(new WindowPartitionCache.CacheLoader<Long,
WindowPartition<T>>() {
+ @Override
+ public WindowPartition<T> load(Long id) {
+ LOG.debug("Load partition: {}", id);
+ // load from state
+ try {
+ windowPartitionsLock.lock(id);
+ return windowPartitionsState.get(id, new
WindowPartition<>(id));
+ } finally {
+ windowPartitionsLock.unlock(id);
+ }
+ }
+ });
+ }
+
+ private void deletePartition(long pid) {
+ LOG.debug("Delete partition: {}", pid);
+ try {
+ partitionIdsLock.lock();
+ partitionIds.remove(pid);
+ partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
+ } finally {
+ partitionIdsLock.unlock();
+ }
+ }
+
+ private long getNextPartitionId() {
+ try {
+ partitionIdsLock.lock();
+ partitionIds.add(++latestPartitionId);
+ partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
+ } finally {
+ partitionIdsLock.unlock();
+ }
+ return latestPartitionId;
+ }
+
+ private WindowPartition<T> getPinnedPartition(long id) {
+ return cache.pinAndGet(id);
+ }
+
+ private void flush() {
+ LOG.debug("Flushing modified partitions");
+ cache.asMap().forEach((pid, p) -> {
+ Long pidToInvalidate = null;
+ try {
+ windowPartitionsLock.lock(pid);
+ if (p.isEmpty() && pid != latestPartitionId) {
+ LOG.debug("Invalidating empty partition {}", pid);
+ deletePartition(pid);
+ windowPartitionsState.delete(pid);
+ pidToInvalidate = pid;
+ } else if (p.isModified()) {
+ LOG.debug("Updating modified partition {}", pid);
+ p.clearModified();
+ windowPartitionsState.put(pid, p);
+ }
+ } finally {
+ windowPartitionsLock.unlock(pid);
+ }
+ // invalidate after releasing the lock
+ if (pidToInvalidate != null) {
+ cache.invalidate(pidToInvalidate);
--- End diff --
If this id is pinned when this happens, when is it removed?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---