This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d24254e146a [feature](journal) Add a method to write a set of journals
in batch (#30380)
d24254e146a is described below
commit d24254e146a06d8b2b9a65de0fbdb55ea9735a29
Author: walter <[email protected]>
AuthorDate: Thu Jan 25 22:26:29 2024 +0800
[feature](journal) Add a method to write a set of journals in batch (#30380)
---
.../java/org/apache/doris/journal/Journal.java | 5 ++
.../org/apache/doris/journal/JournalBatch.java | 81 ++++++++++++++++++
.../apache/doris/journal/bdbje/BDBJEJournal.java | 78 +++++++++++++++++
.../apache/doris/journal/local/LocalJournal.java | 12 +++
.../doris/persist/EditLogFileOutputStream.java | 5 ++
.../apache/doris/persist/EditLogOutputStream.java | 2 +
.../doris/journal/bdbje/BDBJEJournalTest.java | 97 ++++++++++++++++++++++
7 files changed, 280 insertions(+)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java
index 8fca299df19..b5b37a80ef0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java
@@ -49,6 +49,11 @@ public interface Journal {
// Write a journal and sync to disk
public long write(short op, Writable writable) throws IOException;
+ // Write a set of journal to disk in batch.
+ //
+ // Return the first id of the batched journals.
+ public long write(JournalBatch batch) throws IOException;
+
// Get current journal number
public long getJournalNum();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
new file mode 100644
index 00000000000..0fc0ccf9355
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.journal;
+
+import org.apache.doris.common.io.DataOutputBuffer;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.OperationType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class JournalBatch {
+ private static final int OUTPUT_BUFFER_INIT_SIZE = 128;
+
+ private ArrayList<Entity> entities;
+
+ public JournalBatch() {
+ entities = new ArrayList<>();
+ }
+
+ public JournalBatch(int cap) {
+ entities = new ArrayList<>(cap);
+ }
+
+ // Add a writable data into journal batch.
+ //
+ // The writable data will be serialized and saved in the journal batch
with an internal
+ // representation, so it is safety to update the data object once this
function returned.
+ public void addJournal(short op, Writable data) throws IOException {
+ if (op == OperationType.OP_TIMESTAMP) {
+ // OP_TIMESTAMP is not supported, see `BDBJEJournal.write` for
details.
+ throw new RuntimeException("JournalBatch.addJournal is not
supported OP_TIMESTAMP");
+ }
+
+ JournalEntity entity = new JournalEntity();
+ entity.setOpCode(op);
+ entity.setData(data);
+
+ DataOutputBuffer buffer = new
DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
+ entity.write(buffer);
+
+ entities.add(new Entity(op, buffer));
+ }
+
+ public ArrayList<Entity> getJournalEntities() {
+ return entities;
+ }
+
+ public static class Entity {
+ short op;
+ DataOutputBuffer data;
+
+ Entity(short op, DataOutputBuffer data) {
+ this.op = op;
+ this.data = data;
+ }
+
+ public short getOpCode() {
+ return op;
+ }
+
+ public byte[] getBinaryData() {
+ return data.getData();
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 3698b8f76ca..95efbfa3780 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.journal.Journal;
+import org.apache.doris.journal.JournalBatch;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.metric.MetricRepo;
@@ -121,6 +122,83 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
}
}
+ @Override
+ public synchronized long write(JournalBatch batch) throws IOException {
+ List<JournalBatch.Entity> entities = batch.getJournalEntities();
+ int entitySize = entities.size();
+ long dataSize = 0;
+ long firstId = nextJournalId.getAndAdd(entitySize);
+
+ // Write the journals to bdb.
+ for (int i = 0; i < RETRY_TIME; i++) {
+ Transaction txn = null;
+ try {
+ // The default config is constructed from the configs of
environment.
+ txn =
bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, null);
+ for (int j = 0; j < entitySize; ++j) {
+ JournalBatch.Entity entity = entities.get(j);
+ DatabaseEntry theKey = idToKey(firstId + j);
+ DatabaseEntry theData = new
DatabaseEntry(entity.getBinaryData());
+ currentJournalDB.put(txn, theKey, theData); // Put with
overwrite, it always success
+ dataSize += theData.getSize();
+ if (i == 0) {
+ LOG.debug("opCode = {}, journal size = {}",
entity.getOpCode(), theData.getSize());
+ }
+ }
+
+ txn.commit();
+ txn = null;
+
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase(dataSize);
+
MetricRepo.COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.increase(dataSize);
+ }
+
+ return firstId;
+ } catch (ReplicaWriteException e) {
+ /**
+ * This exception indicates that an update operation or
transaction commit
+ * or abort was attempted while in the
+ * {@link ReplicatedEnvironment.State#REPLICA} state. The
transaction is marked
+ * as being invalid.
+ * <p>
+ * The exception is the result of either an error in the
application logic or
+ * the result of a transition of the node from Master to
Replica while a
+ * transaction was in progress.
+ * <p>
+ * The application must abort the current transaction and
redirect all
+ * subsequent update operations to the Master.
+ */
+ LOG.error("catch ReplicaWriteException when writing to
database, will exit. the first journal id {}",
+ firstId, e);
+ String msg = "write bdb failed. will exit. the first
journalId: " + firstId + ", bdb database Name: "
+ + currentJournalDB.getDatabaseName();
+ LOG.error(msg);
+ Util.stdoutWithTime(msg);
+ System.exit(-1);
+ } catch (DatabaseException e) {
+ LOG.error("catch an exception when writing to database. sleep
and retry. the first journal id {}",
+ firstId, e);
+ try {
+ Thread.sleep(5 * 1000);
+ } catch (InterruptedException e1) {
+ LOG.warn("", e1);
+ }
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ }
+ }
+ }
+
+ String msg = "write bdb failed. will exit. the first journalId: " +
firstId + ", bdb database Name: "
+ + currentJournalDB.getDatabaseName();
+ LOG.error(msg);
+ Util.stdoutWithTime(msg);
+ System.exit(-1);
+ return 0; // unreachable!
+ }
+
@Override
public synchronized long write(short op, Writable writable) throws
IOException {
JournalEntity entity = new JournalEntity();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
index 9ba3274e0f0..45ae377a401 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
@@ -19,6 +19,7 @@ package org.apache.doris.journal.local;
import org.apache.doris.common.io.Writable;
import org.apache.doris.journal.Journal;
+import org.apache.doris.journal.JournalBatch;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.persist.EditLogFileOutputStream;
@@ -139,6 +140,17 @@ public class LocalJournal implements Journal {
return cursor;
}
+ @Override
+ public synchronized long write(JournalBatch batch) throws IOException {
+ List<JournalBatch.Entity> entities = batch.getJournalEntities();
+ for (JournalBatch.Entity entity : entities) {
+ outputStream.write(entity.getOpCode(), entity.getBinaryData());
+ }
+ outputStream.setReadyToFlush();
+ outputStream.flush();
+ return journalId.getAndAdd(entities.size());
+ }
+
@Override
public synchronized long write(short op, Writable writable) throws
IOException {
outputStream.write(op, writable);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
index 3d8b360704e..d98f66bacd1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
@@ -65,6 +65,11 @@ public class EditLogFileOutputStream extends
EditLogOutputStream {
writable.write(bufCurrent);
}
+ public void write(short op, byte[] data) throws IOException {
+ bufCurrent.writeShort(op);
+ bufCurrent.write(data);
+ }
+
// Create empty edits logs file.
void create() throws IOException {
fc.truncate(0);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java
index 95b746e0cb2..e7f966b397e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java
@@ -51,6 +51,8 @@ public abstract class EditLogOutputStream extends
OutputStream {
*/
public abstract void write(short op, Writable writable) throws IOException;
+ public abstract void write(short op, byte[] data) throws IOException;
+
abstract void create() throws IOException;
public abstract void close() throws IOException;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
index ba81d6697ba..edcbf9c033c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.journal.JournalBatch;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.persist.OperationType;
@@ -228,4 +229,100 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS
LINE: BDBJE should use
Assertions.assertEquals(21, journal.getDatabaseNames().get(0));
journal.close();
}
+
+ @RepeatedTest(1)
+ public void testJournalBatch() throws Exception {
+ int port = findValidPort();
+ Preconditions.checkArgument(((port > 0) && (port < 65535)));
+ String nodeName = Env.genFeNodeName("127.0.0.1", port, false);
+ long replayedJournalId = 0;
+ File tmpDir = createTmpDir();
+ new MockUp<Env>() {
+ HostInfo selfNode = new HostInfo("127.0.0.1", port);
+ @Mock
+ public String getBdbDir() {
+ return tmpDir.getAbsolutePath();
+ }
+
+ @Mock
+ public HostInfo getSelfNode() {
+ return this.selfNode;
+ }
+
+ @Mock
+ public HostInfo getHelperNode() {
+ return this.selfNode;
+ }
+
+ @Mock
+ public boolean isElectable() {
+ return true;
+ }
+
+ @Mock
+ public long getReplayedJournalId() {
+ return replayedJournalId;
+ }
+ };
+
+ LOG.info("BdbDir:{}, selfNode:{}, nodeName:{}",
Env.getServingEnv().getBdbDir(),
+ Env.getServingEnv().getBdbDir(), nodeName);
+ Assertions.assertEquals(tmpDir.getAbsolutePath(),
Env.getServingEnv().getBdbDir());
+ BDBJEJournal journal = new BDBJEJournal(nodeName);
+ journal.open();
+ // BDBEnvironment need several seconds election from unknown to master
+ for (int i = 0; i < 10; i++) {
+ if
(journal.getBDBEnvironment().getReplicatedEnvironment().getState()
+ .equals(ReplicatedEnvironment.State.MASTER)) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ Assertions.assertEquals(ReplicatedEnvironment.State.MASTER,
+
journal.getBDBEnvironment().getReplicatedEnvironment().getState());
+
+ journal.rollJournal();
+ JournalBatch batch = new JournalBatch(10);
+ for (int i = 0; i < 10; i++) {
+ String data = "JournalBatch item " + i;
+ Writable writable = new Writable() {
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, data);
+ }
+ };
+ // CREATE_MTMV_JOB is deprecated, and safe to write any data.
+ batch.addJournal(OperationType.OP_CREATE_MTMV_JOB, writable);
+ }
+ long journalId = journal.write(batch);
+ Assertions.assertEquals(1, journalId);
+
+ Assertions.assertEquals(10, journal.getMaxJournalId());
+ Assertions.assertEquals(10, journal.getJournalNum());
+ Assertions.assertEquals(1, journal.getMinJournalId());
+ Assertions.assertEquals(0, journal.getFinalizedJournalId());
+
+ LOG.debug("journal.getDatabaseNames(): {}",
journal.getDatabaseNames());
+ Assertions.assertEquals(1, journal.getDatabaseNames().size());
+ Assertions.assertEquals(1, journal.getDatabaseNames().get(0));
+
+ JournalEntity journalEntity = journal.read(1);
+ Assertions.assertEquals(OperationType.OP_CREATE_MTMV_JOB,
journalEntity.getOpCode());
+
+ batch = new JournalBatch(10);
+ for (int i = 0; i < 10; i++) {
+ String data = "JournalBatch 2 item " + i;
+ Writable writable = new Writable() {
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, data);
+ }
+ };
+ batch.addJournal(OperationType.OP_CREATE_MTMV_JOB, writable);
+ }
+ journalId = journal.write(batch);
+ Assertions.assertEquals(11, journalId);
+
+ journal.close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]