This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 798672f  MONOR: Remove redudant LocalLogManager (#10325)
798672f is described below

commit 798672fa141d1ba53d49844f9ac7d2c8060381fa
Author: dengziming <[email protected]>
AuthorDate: Fri Apr 2 09:37:10 2021 +0800

    MONOR: Remove redudant LocalLogManager (#10325)
    
    Remove an unused copy of LocalLogManager (a test class) that ended
    up in the main code directory.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../org/apache/kafka/metalog/LocalLogManager.java  | 391 ---------------------
 1 file changed, 391 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
deleted file mode 100644
index 819de56..0000000
--- a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metalog;
-
-import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.metadata.ApiMessageAndVersion;
-import org.apache.kafka.queue.EventQueue;
-import org.apache.kafka.queue.KafkaEventQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-
-/**
- * The LocalLogManager is a test implementation that relies on the contents of 
memory.
- */
-public final class LocalLogManager implements MetaLogManager, AutoCloseable {
-    interface LocalBatch {
-        int size();
-    }
-
-    static class LeaderChangeBatch implements LocalBatch {
-        private final MetaLogLeader newLeader;
-
-        LeaderChangeBatch(MetaLogLeader newLeader) {
-            this.newLeader = newLeader;
-        }
-
-        @Override
-        public int size() {
-            return 1;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof LeaderChangeBatch)) return false;
-            LeaderChangeBatch other = (LeaderChangeBatch) o;
-            if (!other.newLeader.equals(newLeader)) return false;
-            return true;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(newLeader);
-        }
-
-        @Override
-        public String toString() {
-            return "LeaderChangeBatch(newLeader=" + newLeader + ")";
-        }
-    }
-
-    static class LocalRecordBatch implements LocalBatch {
-        private final List<ApiMessage> records;
-
-        LocalRecordBatch(List<ApiMessage> records) {
-            this.records = records;
-        }
-
-        @Override
-        public int size() {
-            return records.size();
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof LocalRecordBatch)) return false;
-            LocalRecordBatch other = (LocalRecordBatch) o;
-            if (!other.records.equals(records)) return false;
-            return true;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(records);
-        }
-
-        @Override
-        public String toString() {
-            return "LocalRecordBatch(records=" + records + ")";
-        }
-    }
-
-    public static class SharedLogData {
-        private final Logger log = 
LoggerFactory.getLogger(SharedLogData.class);
-        private final HashMap<Integer, LocalLogManager> logManagers = new 
HashMap<>();
-        private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
-        private MetaLogLeader leader = new MetaLogLeader(-1, -1);
-        private long prevOffset = -1;
-
-        synchronized void registerLogManager(LocalLogManager logManager) {
-            if (logManagers.put(logManager.nodeId(), logManager) != null) {
-                throw new RuntimeException("Can't have multiple 
LocalLogManagers " +
-                    "with id " + logManager.nodeId());
-            }
-            electLeaderIfNeeded();
-        }
-
-        synchronized void unregisterLogManager(LocalLogManager logManager) {
-            if (!logManagers.remove(logManager.nodeId(), logManager)) {
-                throw new RuntimeException("Log manager " + 
logManager.nodeId() +
-                    " was not found.");
-            }
-        }
-
-        synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) {
-            if (epoch != leader.epoch()) {
-                log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch 
does not " +
-                    "match the current leader epoch of {}.", nodeId, epoch, 
leader.epoch());
-                return Long.MAX_VALUE;
-            }
-            if (nodeId != leader.nodeId()) {
-                log.trace("tryAppend(nodeId={}, epoch={}): the given node id 
does not " +
-                    "match the current leader id of {}.", nodeId, epoch, 
leader.nodeId());
-                return Long.MAX_VALUE;
-            }
-            log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
-            long offset = append(batch);
-            electLeaderIfNeeded();
-            return offset;
-        }
-
-        synchronized long append(LocalBatch batch) {
-            prevOffset += batch.size();
-            log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
-            batches.put(prevOffset, batch);
-            if (batch instanceof LeaderChangeBatch) {
-                LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) 
batch;
-                leader = leaderChangeBatch.newLeader;
-            }
-            for (LocalLogManager logManager : logManagers.values()) {
-                logManager.scheduleLogCheck();
-            }
-            return prevOffset;
-        }
-
-        synchronized void electLeaderIfNeeded() {
-            if (leader.nodeId() != -1 || logManagers.isEmpty()) {
-                return;
-            }
-            int nextLeaderIndex = 
ThreadLocalRandom.current().nextInt(logManagers.size());
-            Iterator<Integer> iter = logManagers.keySet().iterator();
-            Integer nextLeaderNode = null;
-            for (int i = 0; i <= nextLeaderIndex; i++) {
-                nextLeaderNode = iter.next();
-            }
-            MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, 
leader.epoch() + 1);
-            log.info("Elected new leader: {}.", newLeader);
-            append(new LeaderChangeBatch(newLeader));
-        }
-
-        synchronized Entry<Long, LocalBatch> nextBatch(long offset) {
-            Entry<Long, LocalBatch> entry = batches.higherEntry(offset);
-            if (entry == null) {
-                return null;
-            }
-            return new SimpleImmutableEntry<>(entry.getKey(), 
entry.getValue());
-        }
-    }
-
-    private static class MetaLogListenerData {
-        private long offset = -1;
-        private final MetaLogListener listener;
-
-        MetaLogListenerData(MetaLogListener listener) {
-            this.listener = listener;
-        }
-    }
-
-    private final Logger log;
-
-    private final int nodeId;
-
-    private final SharedLogData shared;
-
-    private final EventQueue eventQueue;
-
-    private boolean initialized = false;
-
-    private boolean shutdown = false;
-
-    private long maxReadOffset = Long.MAX_VALUE;
-
-    private final List<MetaLogListenerData> listeners = new ArrayList<>();
-
-    private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1);
-
-    public LocalLogManager(LogContext logContext,
-                           int nodeId,
-                           SharedLogData shared,
-                           String threadNamePrefix) {
-        this.log = logContext.logger(LocalLogManager.class);
-        this.nodeId = nodeId;
-        this.shared = shared;
-        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, 
threadNamePrefix);
-        shared.registerLogManager(this);
-    }
-
-    private void scheduleLogCheck() {
-        eventQueue.append(() -> {
-            try {
-                log.debug("Node {}: running log check.", nodeId);
-                int numEntriesFound = 0;
-                for (MetaLogListenerData listenerData : listeners) {
-                    while (true) {
-                        Entry<Long, LocalBatch> entry = 
shared.nextBatch(listenerData.offset);
-                        if (entry == null) {
-                            log.trace("Node {}: reached the end of the log 
after finding " +
-                                "{} entries.", nodeId, numEntriesFound);
-                            break;
-                        }
-                        long entryOffset = entry.getKey();
-                        if (entryOffset > maxReadOffset) {
-                            log.trace("Node {}: after {} entries, not reading 
the next " +
-                                "entry because its offset is {}, and 
maxReadOffset is {}.",
-                                nodeId, numEntriesFound, entryOffset, 
maxReadOffset);
-                            break;
-                        }
-                        if (entry.getValue() instanceof LeaderChangeBatch) {
-                            LeaderChangeBatch batch = (LeaderChangeBatch) 
entry.getValue();
-                            log.trace("Node {}: handling LeaderChange to {}.",
-                                nodeId, batch.newLeader);
-                            
listenerData.listener.handleNewLeader(batch.newLeader);
-                            if (batch.newLeader.epoch() > leader.epoch()) {
-                                leader = batch.newLeader;
-                            }
-                        } else if (entry.getValue() instanceof 
LocalRecordBatch) {
-                            LocalRecordBatch batch = (LocalRecordBatch) 
entry.getValue();
-                            log.trace("Node {}: handling LocalRecordBatch with 
offset {}.",
-                                nodeId, entryOffset);
-                            listenerData.listener.handleCommits(entryOffset, 
batch.records);
-                        }
-                        numEntriesFound++;
-                        listenerData.offset = entryOffset;
-                    }
-                }
-                log.trace("Completed log check for node " + nodeId);
-            } catch (Exception e) {
-                log.error("Exception while handling log check", e);
-            }
-        });
-    }
-
-    public void beginShutdown() {
-        eventQueue.beginShutdown("beginShutdown", () -> {
-            try {
-                if (initialized && !shutdown) {
-                    log.debug("Node {}: beginning shutdown.", nodeId);
-                    renounce(leader.epoch());
-                    for (MetaLogListenerData listenerData : listeners) {
-                        listenerData.listener.beginShutdown();
-                    }
-                    shared.unregisterLogManager(this);
-                }
-            } catch (Exception e) {
-                log.error("Unexpected exception while sending beginShutdown 
callbacks", e);
-            }
-            shutdown = true;
-        });
-    }
-
-    @Override
-    public void close() throws InterruptedException {
-        log.debug("Node {}: closing.", nodeId);
-        beginShutdown();
-        eventQueue.close();
-    }
-
-    @Override
-    public void initialize() throws Exception {
-        eventQueue.append(() -> {
-            log.debug("initialized local log manager for node " + nodeId);
-            initialized = true;
-        });
-    }
-
-    @Override
-    public void register(MetaLogListener listener) throws Exception {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        eventQueue.append(() -> {
-            if (shutdown) {
-                log.info("Node {}: can't register because local log manager 
has " +
-                    "already been shut down.", nodeId);
-                future.complete(null);
-            } else if (initialized) {
-                log.info("Node {}: registered MetaLogListener.", nodeId);
-                listeners.add(new MetaLogListenerData(listener));
-                shared.electLeaderIfNeeded();
-                scheduleLogCheck();
-                future.complete(null);
-            } else {
-                log.info("Node {}: can't register because local log manager 
has not " +
-                    "been initialized.", nodeId);
-                future.completeExceptionally(new RuntimeException(
-                    "LocalLogManager was not initialized."));
-            }
-        });
-        future.get();
-    }
-
-    @Override
-    public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return scheduleAtomicWrite(epoch, batch);
-    }
-
-    @Override
-    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> 
batch) {
-        return shared.tryAppend(
-            nodeId,
-            leader.epoch(),
-            new LocalRecordBatch(
-                batch
-                    .stream()
-                    .map(ApiMessageAndVersion::message)
-                    .collect(Collectors.toList())
-            )
-        );
-    }
-
-    @Override
-    public void renounce(long epoch) {
-        MetaLogLeader curLeader = leader;
-        MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 
1);
-        shared.tryAppend(nodeId, curLeader.epoch(), new 
LeaderChangeBatch(nextLeader));
-    }
-
-    @Override
-    public MetaLogLeader leader() {
-        return leader;
-    }
-
-    @Override
-    public int nodeId() {
-        return nodeId;
-    }
-
-    public List<MetaLogListener> listeners() {
-        final CompletableFuture<List<MetaLogListener>> future = new 
CompletableFuture<>();
-        eventQueue.append(() -> {
-            future.complete(listeners.stream().map(l -> 
l.listener).collect(Collectors.toList()));
-        });
-        try {
-            return future.get();
-        } catch (ExecutionException | InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void setMaxReadOffset(long maxReadOffset) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        eventQueue.append(() -> {
-            log.trace("Node {}: set maxReadOffset to {}.", nodeId, 
maxReadOffset);
-            this.maxReadOffset = maxReadOffset;
-            scheduleLogCheck();
-            future.complete(null);
-        });
-        try {
-            future.get();
-        } catch (ExecutionException | InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

Reply via email to