eolivelli commented on code in PR #3493:
URL: https://github.com/apache/bookkeeper/pull/3493#discussion_r973713300
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java:
##########
@@ -333,63 +381,84 @@ public void onRotateEntryLog() {
// flushed to the entry log file.
}
- BookieStateManager getStateManager(){
- return (BookieStateManager) stateManager;
- }
-
- public DefaultEntryLogger getEntryLogger() {
- return interleavedLedgerStorage.getEntryLogger();
+ public List<DefaultEntryLogger> getEntryLogger() {
Review Comment:
getDefaultEntryLoggers
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java:
##########
@@ -333,63 +381,84 @@ public void onRotateEntryLog() {
// flushed to the entry log file.
}
- BookieStateManager getStateManager(){
- return (BookieStateManager) stateManager;
- }
-
- public DefaultEntryLogger getEntryLogger() {
- return interleavedLedgerStorage.getEntryLogger();
+ public List<DefaultEntryLogger> getEntryLogger() {
+ List<DefaultEntryLogger> listIt = new ArrayList<>(numberOfDirs);
+ for (InterleavedLedgerStorage ls : interleavedLedgerStorageList) {
+ listIt.add(ls.getEntryLogger());
+ }
+ return listIt;
}
@Override
public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long
lastLedgerId) throws IOException {
- return interleavedLedgerStorage.getActiveLedgersInRange(firstLedgerId,
lastLedgerId);
+ List<Iterable<Long>> listIt = new ArrayList<>(numberOfDirs);
+ for (InterleavedLedgerStorage ls : interleavedLedgerStorageList) {
+ listIt.add(ls.getActiveLedgersInRange(firstLedgerId,
lastLedgerId));
+ }
+ return Iterables.concat(listIt);
}
@Override
public void updateEntriesLocations(Iterable<EntryLocation> locations)
throws IOException {
- interleavedLedgerStorage.updateEntriesLocations(locations);
+ for (InterleavedLedgerStorage s : interleavedLedgerStorageList) {
+ s.updateEntriesLocations(locations);
+ }
}
@Override
public void flushEntriesLocationsIndex() throws IOException {
- interleavedLedgerStorage.flushEntriesLocationsIndex();
+ for (InterleavedLedgerStorage s : interleavedLedgerStorageList) {
+ s.flushEntriesLocationsIndex();
+ }
}
@Override
public LedgerStorage getUnderlyingLedgerStorage() {
- return interleavedLedgerStorage;
+ return interleavedLedgerStorageList.get(0);
}
@Override
public void forceGC() {
- interleavedLedgerStorage.forceGC();
+ interleavedLedgerStorageList.forEach(s -> s.forceGC());
Review Comment:
If one of the instances throws an error we won't process the others.
I suggest to wrap the error and continue processing.
Then, if one error occurred we throw it in the end.
The same here and in other similar methods
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java:
##########
@@ -310,16 +355,19 @@ public void onSizeLimitReached(final Checkpoint cp)
throws IOException {
scheduler.execute(new Runnable() {
@Override
public void run() {
- try {
- LOG.info("Started flushing mem table.");
-
interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush();
- memTable.flush(SortedLedgerStorage.this);
- if
(interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) {
-
interleavedLedgerStorage.checkpointer.startCheckpoint(cp);
+ for (InterleavedLedgerStorage s :
interleavedLedgerStorageList) {
+ try {
+ LOG.info("Started flushing mem table.");
Review Comment:
This line will be logged multiple times. Can we log something about 's'?
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java:
##########
@@ -56,18 +63,22 @@
EntryMemTable memTable;
private ScheduledExecutorService scheduler;
- private StateManager stateManager;
private ServerConfiguration conf;
private StatsLogger statsLogger;
- private final InterleavedLedgerStorage interleavedLedgerStorage;
+ protected final List<InterleavedLedgerStorage>
interleavedLedgerStorageList;
+ private int numberOfDirs;
+ private String interleavedLedgerStorageClazz =
InterleavedLedgerStorage.class.getName();
public SortedLedgerStorage() {
- this(new InterleavedLedgerStorage());
+ this(null);
}
@VisibleForTesting
- protected SortedLedgerStorage(InterleavedLedgerStorage ils) {
- interleavedLedgerStorage = ils;
+ protected SortedLedgerStorage(String interleavedLedgerStorageClazz) {
Review Comment:
Why do we pass a class name here?
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java:
##########
@@ -333,63 +381,84 @@ public void onRotateEntryLog() {
// flushed to the entry log file.
}
- BookieStateManager getStateManager(){
- return (BookieStateManager) stateManager;
- }
-
- public DefaultEntryLogger getEntryLogger() {
- return interleavedLedgerStorage.getEntryLogger();
+ public List<DefaultEntryLogger> getEntryLogger() {
+ List<DefaultEntryLogger> listIt = new ArrayList<>(numberOfDirs);
+ for (InterleavedLedgerStorage ls : interleavedLedgerStorageList) {
+ listIt.add(ls.getEntryLogger());
+ }
+ return listIt;
}
@Override
public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long
lastLedgerId) throws IOException {
- return interleavedLedgerStorage.getActiveLedgersInRange(firstLedgerId,
lastLedgerId);
+ List<Iterable<Long>> listIt = new ArrayList<>(numberOfDirs);
+ for (InterleavedLedgerStorage ls : interleavedLedgerStorageList) {
+ listIt.add(ls.getActiveLedgersInRange(firstLedgerId,
lastLedgerId));
+ }
+ return Iterables.concat(listIt);
}
@Override
public void updateEntriesLocations(Iterable<EntryLocation> locations)
throws IOException {
- interleavedLedgerStorage.updateEntriesLocations(locations);
+ for (InterleavedLedgerStorage s : interleavedLedgerStorageList) {
+ s.updateEntriesLocations(locations);
+ }
}
@Override
public void flushEntriesLocationsIndex() throws IOException {
- interleavedLedgerStorage.flushEntriesLocationsIndex();
+ for (InterleavedLedgerStorage s : interleavedLedgerStorageList) {
+ s.flushEntriesLocationsIndex();
+ }
}
@Override
public LedgerStorage getUnderlyingLedgerStorage() {
- return interleavedLedgerStorage;
+ return interleavedLedgerStorageList.get(0);
Review Comment:
I am not sure that this is correct.
Is this method only for tests?
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java:
##########
@@ -310,16 +355,19 @@ public void onSizeLimitReached(final Checkpoint cp)
throws IOException {
scheduler.execute(new Runnable() {
@Override
public void run() {
- try {
- LOG.info("Started flushing mem table.");
-
interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush();
- memTable.flush(SortedLedgerStorage.this);
- if
(interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) {
-
interleavedLedgerStorage.checkpointer.startCheckpoint(cp);
+ for (InterleavedLedgerStorage s :
interleavedLedgerStorageList) {
+ try {
+ LOG.info("Started flushing mem table.");
+ s.getEntryLogger().prepareEntryMemTableFlush();
+ memTable.flush(SortedLedgerStorage.this);
+
+ if (s.getEntryLogger().commitEntryMemTableFlush()) {
+ s.checkpointer.startCheckpoint(cp);
+ }
+ } catch (Exception e) {
+ s.getStateManager().transitionToReadOnlyMode();
Review Comment:
Also here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]