Author: chirino
Date: Mon Nov 10 12:41:23 2008
New Revision: 712827
URL: http://svn.apache.org/viewvc?rev=712827&view=rev
Log:
More progress on replication.. bulk file sync and real time sync now seems to
be working. Need to work on handling all the failure conditions now.
Added:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
Removed:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ha/command/
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
Mon Nov 10 12:41:23 2008
@@ -347,7 +347,8 @@
//
// is it just 1 big write?
- if (wb.size == write.location.getSize()) {
+ ReplicationTarget replicationTarget =
dataManager.getReplicationTarget();
+ if (wb.size == write.location.getSize() &&
replicationTarget==null) {
forceToDisk = write.sync | write.onComplete != null;
// Just write it directly..
@@ -360,7 +361,7 @@
} else {
- // Combine the smaller writes into 1 big buffer
+ // We are going to do 1 big write.
while (write != null) {
forceToDisk |= write.sync | write.onComplete != null;
@@ -377,6 +378,11 @@
// Now do the 1 big write.
ByteSequence sequence = buff.toByteSequence();
file.write(sequence.getData(), sequence.getOffset(),
sequence.getLength());
+
+ if( replicationTarget!=null ) {
+
replicationTarget.replicate(wb.writes.getHead().location, sequence,
forceToDisk);
+ }
+
buff.reset();
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
Mon Nov 10 12:41:23 2008
@@ -93,7 +93,7 @@
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
protected DataFileAppender appender;
- protected DataFileAccessorPool accessorPool = new
DataFileAccessorPool(this);
+ protected DataFileAccessorPool accessorPool;
protected Map<Integer, DataFile> fileMap = new HashMap<Integer,
DataFile>();
protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File,
DataFile>();
@@ -104,6 +104,7 @@
protected Runnable cleanupTask;
protected final AtomicLong totalLength = new AtomicLong();
protected boolean archiveDataLogs;
+ private ReplicationTarget replicationTarget;
@SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
@@ -111,6 +112,7 @@
return;
}
+ accessorPool = new DataFileAccessorPool(this);
started = true;
preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() -
PREFERED_DIFF);
lock();
@@ -176,8 +178,11 @@
}
}
- storeState(false);
-
+ ByteSequence storedState = storeState(true);
+ if( dataFiles.isEmpty() ) {
+ appender.storeItem(storedState, Location.MARK_TYPE, true);
+ }
+
cleanupTask = new Runnable() {
public void run() {
cleanup();
@@ -317,6 +322,9 @@
fileByFileMap.clear();
controlFile.unlock();
controlFile.dispose();
+ controlFile=null;
+ dataFiles.clear();
+ lastAppendLocation.set(null);
started = false;
}
@@ -413,6 +421,11 @@
return mark;
}
+ public synchronized void appendedExternally(Location loc, int length)
throws IOException {
+ DataFile dataFile = getDataFile(loc);
+ dataFile.incrementLength(length);
+ }
+
public synchronized Location getNextLocation(Location location) throws
IOException, IllegalStateException {
Location cur = null;
@@ -541,10 +554,10 @@
storeState(sync);
}
- protected synchronized void storeState(boolean sync) throws IOException {
+ protected synchronized ByteSequence storeState(boolean sync) throws
IOException {
ByteSequence state = marshallState();
- appender.storeItem(state, Location.MARK_TYPE, sync);
controlFile.store(state, sync);
+ return state;
}
public synchronized Location write(ByteSequence data, boolean sync) throws
IOException, IllegalStateException {
@@ -660,4 +673,12 @@
return rc;
}
+ public void setReplicationTarget(ReplicationTarget replicationTarget) {
+ this.replicationTarget = replicationTarget;
+ }
+ public ReplicationTarget getReplicationTarget() {
+ return replicationTarget;
+ }
+
+
}
Added:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java?rev=712827&view=auto
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
(added)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
Mon Nov 10 12:41:23 2008
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import org.apache.kahadb.util.ByteSequence;
+
+public interface ReplicationTarget {
+
+ void replicate(Location location, ByteSequence sequence, boolean sync);
+
+}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Mon Nov 10 12:41:23 2008
@@ -457,11 +457,11 @@
return new File(directory, IOHelper.toFileSystemSafeName(name)+".dat");
}
- private File getFreeFile() {
+ public File getFreeFile() {
return new File(directory, IOHelper.toFileSystemSafeName(name)+".fre");
}
- private File getRecoveryFile() {
+ public File getRecoveryFile() {
return new File(directory, IOHelper.toFileSystemSafeName(name)+".rec");
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
Mon Nov 10 12:41:23 2008
@@ -23,6 +23,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
@@ -38,15 +39,20 @@
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.journal.ReplicationTarget;
import org.apache.kahadb.replication.pb.PBFileInfo;
import org.apache.kahadb.replication.pb.PBHeader;
import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
import org.apache.kahadb.replication.pb.PBSlaveInit;
import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
import org.apache.kahadb.replication.pb.PBType;
import org.apache.kahadb.store.KahaDBStore;
+import org.apache.kahadb.util.ByteSequence;
-public class ReplicationMaster implements Service, ClusterListener {
+import com.google.protobuf.ByteString;
+
+public class ReplicationMaster implements Service, ClusterListener,
ReplicationTarget {
private static final Log LOG =
LogFactory.getLog(ReplicationServer.class);
@@ -55,7 +61,7 @@
private Object serverMutex = new Object() {
};
private TransportServer server;
- private ArrayList<ReplicationSession> sessions = new
ArrayList<ReplicationSession>();
+ private CopyOnWriteArrayList<ReplicationSession> sessions = new
CopyOnWriteArrayList<ReplicationSession>();
public ReplicationMaster(ReplicationServer replication1Server) {
this.replicationServer = replication1Server;
@@ -83,6 +89,7 @@
});
server.start();
}
+
replicationServer.getStore().getJournal().setReplicationTarget(this);
}
public void stop() throws Exception {
@@ -95,8 +102,34 @@
}
public void onClusterChange(ClusterState config) {
- // TODO: if a slave is removed from the cluster, we should
- // remove it's replication tracking info.
+ }
+
+
+ /**
+ * This is called by the Journal so that we can replicate the update to
the
+ * slaves.
+ */
+ @Override
+ public void replicate(Location location, ByteSequence sequence, boolean
sync) {
+ if( sessions.isEmpty() )
+ return;
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
+ PBJournalUpdate payload = new PBJournalUpdate();
+ payload.setLocation(convert(location));
+ payload.setData(ByteString.copyFrom(sequence.getData(),
sequence.getOffset(), sequence.getLength()));
+ frame.setPayload(payload);
+
+ for (ReplicationSession session : sessions) {
+ if( session.subscribedToJournalUpdates.get() ) {
+ // TODO: use async send threads so that the
frames can be pushed out in parallel.
+ try {
+ session.transport.oneway(frame);
+ } catch (IOException e) {
+ session.onException(e);
+ }
+ }
+ }
}
class ReplicationSession implements Service, TransportListener {
@@ -171,8 +204,13 @@
// synced to disk, but while a lock is
still held on the
// store so that no
// updates are allowed.
-
ArrayList<PBFileInfo> infos = new
ArrayList<PBFileInfo>();
+
+ Map<Integer, DataFile> journalFiles =
store.getJournal().getFileMap();
+ for (DataFile df :
journalFiles.values()) {
+
infos.add(replicationServer.createInfo("journal-" + df.getDataFileId(),
df.getFile(), df.getLength()));
+ }
+
SnapshotStatus snapshot =
createSnapshot();
PBFileInfo databaseInfo = new
PBFileInfo();
databaseInfo.setName("database");
@@ -181,11 +219,7 @@
databaseInfo.setEnd(snapshot.size);
databaseInfo.setChecksum(snapshot.checksum);
infos.add(databaseInfo);
-
- Map<Integer, DataFile> journalFiles =
store.getJournal().getFileMap();
- for (DataFile df :
journalFiles.values()) {
- infos.add(createInfo("journal/"
+ df.getDataFileId(), df.getFile(), df.getLength()));
- }
+
rcPayload.setCopyFilesList(infos);
}
});
@@ -284,25 +318,9 @@
}
}
- private PBFileInfo createInfo(String name, File file, int length)
throws IOException {
- PBFileInfo rc = new PBFileInfo();
- rc.setName(name);
- FileInputStream is = new FileInputStream(file);
- byte buffer[] = new byte[1024 * 4];
- int c;
-
- long size = 0;
- Checksum checksum = new Adler32();
- while (size < length && (c = is.read(buffer, 0, (int)
Math.min(length - size, buffer.length))) >= 0) {
- checksum.update(buffer, 0, c);
- size += c;
- }
- rc.setChecksum(checksum.getValue());
- rc.setStart(0);
- rc.setEnd(size);
- return rc;
- }
+
private void onJournalUpdateAck(ReplicationFrame frame,
PBJournalLocation journalLocation) {
}
+
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
Mon Nov 10 12:41:23 2008
@@ -17,12 +17,16 @@
package org.apache.kahadb.replication;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.replication.pb.PBFileInfo;
import org.apache.kahadb.store.KahaDBStore;
/**
@@ -82,6 +86,8 @@
private ClusterState clusterState;
+ private File tempReplicationDir;
+
public void start() throws Exception {
// The cluster will let us know about the cluster configuration,
// which lets us decide if we are going to be a slave or a
master.
@@ -112,6 +118,7 @@
// If the slave service was not yet
started.. start it up.
if (slave == null) {
LOG.info("Starting replication
slave.");
+ store.open();
slave = new
ReplicationSlave(this);
slave.start();
}
@@ -168,10 +175,10 @@
public File getReplicationFile(String fn) throws IOException {
if (fn.equals("database")) {
return getStore().getPageFile().getFile();
- } if (fn.startsWith("journal/")) {
+ } if (fn.startsWith("journal-")) {
int id;
try {
- id =
Integer.parseInt(fn.substring("journal/".length()));
+ id =
Integer.parseInt(fn.substring("journal-".length()));
} catch (NumberFormatException e) {
throw new IOException("Unknown replication file
name: "+fn);
}
@@ -181,6 +188,47 @@
}
}
+ public File getTempReplicationDir() {
+ if( tempReplicationDir == null ) {
+ tempReplicationDir = new File(
getStore().getDirectory(), "replication");
+ }
+ return tempReplicationDir;
+ }
+
+ public File getTempReplicationFile(String fn, int snapshotId) throws
IOException {
+ if (fn.equals("database")) {
+ return new File(getTempReplicationDir(),
"database-"+snapshotId);
+ } if (fn.startsWith("journal-")) {
+ int id;
+ try {
+ id =
Integer.parseInt(fn.substring("journal-".length()));
+ } catch (NumberFormatException e) {
+ throw new IOException("Unknown replication file
name: "+fn);
+ }
+ return new File(getTempReplicationDir(), fn);
+ } else {
+ throw new IOException("Unknown replication file name:
"+fn);
+ }
+ }
+
+ PBFileInfo createInfo(String name, File file, long length) throws
IOException {
+ PBFileInfo rc = new PBFileInfo();
+ rc.setName(name);
+ FileInputStream is = new FileInputStream(file);
+ byte buffer[] = new byte[1024 * 4];
+ int c;
+
+ long size = 0;
+ Checksum checksum = new Adler32();
+ while (size < length && (c = is.read(buffer, 0, (int)
Math.min(length - size, buffer.length))) >= 0) {
+ checksum.update(buffer, 0, c);
+ size += c;
+ }
+ rc.setChecksum(checksum.getValue());
+ rc.setStart(0);
+ rc.setEnd(size);
+ return rc;
+ }
public boolean isMaster() {
return master!=null;
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
Mon Nov 10 12:41:23 2008
@@ -16,14 +16,20 @@
*/
package org.apache.kahadb.replication;
+import java.io.DataOutput;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.RandomAccessFile;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
@@ -32,13 +38,22 @@
import org.apache.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.replication.pb.PBFileInfo;
import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
import org.apache.kahadb.replication.pb.PBSlaveInit;
import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
import org.apache.kahadb.replication.pb.PBType;
+import org.apache.kahadb.store.KahaDBStore;
public class ReplicationSlave implements Service, ClusterListener,
TransportListener {
+
+ private static final int MAX_TRANSFER_SESSIONS = 1;
+
private static final Log LOG =
LogFactory.getLog(ReplicationSlave.class);
private final ReplicationServer replicationServer;
@@ -52,15 +67,76 @@
transport = TransportFactory.connect(new
URI(replicationServer.getClusterState().getMaster()));
transport.setTransportListener(this);
transport.start();
+
+ // Make sure the replication directory exists.
+ replicationServer.getTempReplicationDir().mkdirs();
ReplicationFrame frame = new ReplicationFrame();
frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
PBSlaveInit payload = new PBSlaveInit();
payload.setNodeId(replicationServer.getNodeId());
+
+ // This call back is executed once the checkpoint is
+ // completed and all data has been
+ // synced to disk, but while a lock is still held on the
+ // store so that no
+ // updates are allowed.
+
+ HashMap<String, PBFileInfo> infosMap = new HashMap<String,
PBFileInfo>();
+
+ // Add all the files that were being transfered..
+ File tempReplicationDir =
replicationServer.getTempReplicationDir();
+ File[] list = tempReplicationDir.listFiles();
+ if( list!=null ) {
+ for (File file : list) {
+ String name = file.getName();
+ if( name.startsWith("database-") ) {
+ int snapshot;
+ try {
+ snapshot =
Integer.parseInt(name.substring("database-".length()));
+ } catch (NumberFormatException e) {
+ continue;
+ }
+
+ PBFileInfo info =
replicationServer.createInfo("database", file, file.length());
+ info.setSnapshotId(snapshot);
+ infosMap.put("database", info);
+ } else if( name.startsWith("journal-") ) {
+ PBFileInfo info =
replicationServer.createInfo(name, file, file.length());
+ infosMap.put(name, info);
+ }
+ }
+ }
+
+ // Add all the db files that were not getting transfered..
+ KahaDBStore store = replicationServer.getStore();
+ Map<Integer, DataFile> journalFiles =
store.getJournal().getFileMap();
+ for (DataFile df : journalFiles.values()) {
+ String name = "journal-" + df.getDataFileId();
+ // Did we have a transfer in progress for that file
already?
+ if( infosMap.containsKey(name) ) {
+ continue;
+ }
+ infosMap.put(name, replicationServer.createInfo(name,
df.getFile(), df.getLength()));
+ }
+ if( !infosMap.containsKey("database") ) {
+ File pageFile = store.getPageFile().getFile();
+ if( pageFile.exists() ) {
+ infosMap.put("database",
replicationServer.createInfo("database", pageFile, pageFile.length()));
+ }
+ }
+
+ ArrayList<PBFileInfo> infos = new
ArrayList<PBFileInfo>(infosMap.size());
+ for (PBFileInfo info : infosMap.values()) {
+ infos.add(info);
+ }
+ payload.setCurrentFilesList(infos);
+
frame.setPayload(payload);
- LOG.info("Sending master slave init command: "+payload);
+ LOG.info("Sending master slave init command: " + payload);
+ bulkSynchronizing = true;
transport.oneway(frame);
-
+
}
public void stop() throws Exception {
@@ -76,6 +152,8 @@
case SLAVE_INIT_RESPONSE:
onSlaveInitResponse(frame,
(PBSlaveInitResponse) frame.getPayload());
break;
+ case JOURNAL_UPDATE:
+ onJournalUpdate(frame, (PBJournalUpdate)
frame.getPayload());
}
} catch (Exception e) {
failed(e);
@@ -102,32 +180,137 @@
private Object transferMutex = new Object();
private LinkedList<PBFileInfo> transferQueue = new
LinkedList<PBFileInfo>();
+ private boolean bulkSynchronizing;
+ private PBSlaveInitResponse initResponse;
+
+ int journalUpdateFileId;
+ RandomAccessFile journalUpateFile;
+
+ HashMap<String, PBFileInfo> bulkFiles = new HashMap<String,
PBFileInfo>();
+
+ private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate
update) throws IOException {
+ boolean onlineRecovery=false;
+ PBJournalLocation location = update.getLocation();
+ byte[] data = update.getData().toByteArray();
+ synchronized (transferMutex) {
+ if( journalUpateFile==null ||
journalUpdateFileId!=location.getFileId() ) {
+ if( journalUpateFile!=null) {
+ journalUpateFile.close();
+ }
+ File file;
+ String name = "journal-"+location.getFileId();
+ if( bulkSynchronizing ) {
+ file =
replicationServer.getTempReplicationFile(name, 0);
+ if( !bulkFiles.containsKey(name) ) {
+ bulkFiles.put(name, new
PBFileInfo().setName(name));
+ }
+ } else {
+ // Once the data has been synced.. we
are going to
+ // go into an online recovery mode...
+ file =
replicationServer.getReplicationFile(name);
+ onlineRecovery=true;
+ }
+ journalUpateFile = new RandomAccessFile(file,
"rw");
+ journalUpdateFileId = location.getFileId();
+ }
+ journalUpateFile.seek(location.getOffset());
+ journalUpateFile.write(data);
+ }
+
+ if( onlineRecovery ) {
+ KahaDBStore store = replicationServer.getStore();
+ // Let the journal know that we appended to one of it's
files..
+
store.getJournal().appendedExternally(convert(location), data.length);
+ // Now incrementally recover those records.
+ store.incrementalRecover();
+ }
+ }
+
+ private Location convert(PBJournalLocation location) {
+ Location rc = new Location();
+ rc.setDataFileId(location.getFileId());
+ rc.setOffset(location.getOffset());
+ return rc;
+ }
+
+ private void commitBulkTransfer() throws IOException {
+ synchronized (transferMutex) {
+
+ journalUpateFile.close();
+ journalUpateFile=null;
+ replicationServer.getStore().close();
+
+ // If we got a new snapshot of the database, then we
need to
+ // delete it's assisting files too.
+ if( bulkFiles.containsKey("database") ) {
+ PageFile pageFile =
replicationServer.getStore().getPageFile();
+ pageFile.getRecoveryFile().delete();
+ pageFile.getFreeFile().delete();
+ }
+
+ for (PBFileInfo info : bulkFiles.values()) {
+ File from =
replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+ File to =
replicationServer.getReplicationFile(info.getName());
+ move(from, to);
+ }
+
+ delete(initResponse.getDeleteFilesList());
+ bulkSynchronizing=false;
+
+ replicationServer.getStore().open();
+ }
+ replicationServer.getStore().incrementalRecover();
+ }
private void onSlaveInitResponse(ReplicationFrame frame,
PBSlaveInitResponse response) throws Exception {
- LOG.info("Got init response: "+response);
- delete(response.getDeleteFilesList());
- synchronized(transferMutex) {
+ LOG.info("Got init response: " + response);
+ initResponse = response;
+
+ synchronized (transferMutex) {
+ bulkFiles.clear();
+
+ List<PBFileInfo> infos = response.getCopyFilesList();
+ for (PBFileInfo info : infos) {
+
+ bulkFiles.put(info.getName(), info);
+ File target =
replicationServer.getReplicationFile(info.getName());
+ // are we just appending to an existing file
journal file?
+ if( info.getName().startsWith("journal-") &&
info.getStart() > 0 && target.exists() ) {
+ // Then copy across the first bits..
+ File tempFile =
replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+
+ FileInputStream is = new
FileInputStream(target);
+ FileOutputStream os = new
FileOutputStream(tempFile);
+ try {
+ copy(is, os, info.getStart());
+ } finally {
+ try { is.close(); } catch
(Throwable e){}
+ try { os.close(); } catch
(Throwable e){}
+ }
+ }
+ }
+
+
transferQueue.clear();
- transferQueue.addAll(response.getCopyFilesList());
+ transferQueue.addAll(infos);
}
addTransferSession();
}
-
private PBFileInfo dequeueTransferQueue() throws Exception {
- synchronized( transferMutex ) {
- if( transferQueue.isEmpty() ) {
+ synchronized (transferMutex) {
+ if (transferQueue.isEmpty()) {
return null;
}
return transferQueue.removeFirst();
}
}
-
+
LinkedList<TransferSession> transferSessions = new
LinkedList<TransferSession>();
-
+
private void addTransferSession() throws Exception {
- synchronized( transferMutex ) {
- while( !transferQueue.isEmpty() &&
transferSessions.size()<5 ) {
+ synchronized (transferMutex) {
+ while (!transferQueue.isEmpty() &&
transferSessions.size() < MAX_TRANSFER_SESSIONS) {
TransferSession transferSession = new
TransferSession();
transferSessions.add(transferSession);
try {
@@ -136,11 +319,43 @@
transferSessions.remove(transferSession);
}
}
+ // Once we are done processing all the transfers..
+ if (transferQueue.isEmpty() &&
transferSessions.isEmpty()) {
+ commitBulkTransfer();
+ }
}
}
-
- class TransferSession implements Service, TransportListener {
+
+ private void move(File from, File to) throws IOException {
+ // If a simple rename/mv does not work..
+ to.delete();
+ if (!from.renameTo(to)) {
+
+ // Copy and Delete.
+ FileInputStream is = null;
+ FileOutputStream os = null;
+ try {
+ is = new FileInputStream(from);
+ os = new FileOutputStream(to);
+
+ os.getChannel().transferFrom(is.getChannel(),
0, is.getChannel().size());
+ } finally {
+ try {
+ is.close();
+ } finally {
+ }
+ try {
+ os.close();
+ } finally {
+ }
+ }
+ from.delete();
+ }
+ }
+
+ class TransferSession implements Service, TransportListener {
+
Transport transport;
private PBFileInfo info;
private File toFile;
@@ -158,44 +373,44 @@
private void sendNextRequestOrStop() {
try {
PBFileInfo info = dequeueTransferQueue();
- if( info !=null ) {
-
- toFile =
replicationServer.getReplicationFile(info.getName());
+ if (info != null) {
+
+ toFile =
replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
this.info = info;
-
+
ReplicationFrame frame = new
ReplicationFrame();
frame.setHeader(new
PBHeader().setType(PBType.FILE_TRANSFER));
frame.setPayload(info);
-
- LOG.info("Requesting file:
"+info.getName());
+
+ LOG.info("Requesting file: " +
info.getName());
transferStart =
System.currentTimeMillis();
-
+
transport.oneway(frame);
} else {
stop();
}
-
- } catch ( Exception e ) {
+
+ } catch (Exception e) {
failed(e);
}
}
public void stop() throws Exception {
- if( stopped.compareAndSet(false, true) ) {
+ if (stopped.compareAndSet(false, true)) {
LOG.info("File transfer session stopped.");
- synchronized( transferMutex ) {
- if( info!=null ) {
+ synchronized (transferMutex) {
+ if (info != null) {
transferQueue.addLast(info);
}
info = null;
}
- Thread stopThread = new Thread("Transfer
Session Shutdown: "+transport.getRemoteAddress()) {
+ Thread stopThread = new Thread("Transfer
Session Shutdown: " + transport.getRemoteAddress()) {
@Override
public void run() {
try {
transport.stop();
- synchronized(
transferMutex ) {
-
transferSessions.remove(this);
+ synchronized
(transferMutex) {
+
transferSessions.remove(TransferSession.this);
addTransferSession();
}
} catch (Exception e) {
@@ -211,21 +426,23 @@
public void onCommand(Object command) {
try {
ReplicationFrame frame = (ReplicationFrame)
command;
- InputStream is = (InputStream)
frame.getPayload();
+ InputStream is = (InputStream)
frame.getPayload();
toFile.getParentFile().mkdirs();
- FileOutputStream os = new FileOutputStream(
toFile );
+
+ RandomAccessFile os = new
RandomAccessFile(toFile, "rw");
+ os.seek(info.getStart());
try {
copy(is, os,
frame.getHeader().getPayloadSize());
- long transferTime =
System.currentTimeMillis()-this.transferStart;
- float rate =
frame.getHeader().getPayloadSize()*transferTime/1024000f;
- LOG.info("File "+info.getName()+"
transfered in "+transferTime+" (ms) at "+rate+" Kb/Sec");
+ long transferTime =
System.currentTimeMillis() - this.transferStart;
+ float rate =
frame.getHeader().getPayloadSize() * transferTime / 1024000f;
+ LOG.info("File " + info.getName() + "
transfered in " + transferTime + " (ms) at " + rate + " Kb/Sec");
} finally {
os.close();
}
this.info = null;
this.toFile = null;
-
- sendNextRequestOrStop();
+
+ sendNextRequestOrStop();
} catch (Exception e) {
failed(e);
}
@@ -237,8 +454,8 @@
public void failed(Exception error) {
try {
- if( !stopped.get() ) {
- LOG.warn("Replication session failure:
"+transport.getRemoteAddress());
+ if (!stopped.get()) {
+ LOG.warn("Replication session failure:
" + transport.getRemoteAddress());
}
stop();
} catch (Exception ignore) {
@@ -247,21 +464,32 @@
public void transportInterupted() {
}
+
public void transportResumed() {
}
}
-
+
private void copy(InputStream is, OutputStream os, long length) throws
IOException {
byte buffer[] = new byte[1024 * 4];
- int c=0;
- long pos=0;
- while ( pos <length && ((c = is.read(buffer, 0,
(int)Math.min(buffer.length, length-pos))) >= 0) ) {
+ int c = 0;
+ long pos = 0;
+ while (pos < length && ((c = is.read(buffer, 0, (int)
Math.min(buffer.length, length - pos))) >= 0)) {
os.write(buffer, 0, c);
- pos+=c;
+ pos += c;
}
}
-
+
+ private void copy(InputStream is, DataOutput os, long length) throws
IOException {
+ byte buffer[] = new byte[1024 * 4];
+ int c = 0;
+ long pos = 0;
+ while (pos < length && ((c = is.read(buffer, 0, (int)
Math.min(buffer.length, length - pos))) >= 0)) {
+ os.write(buffer, 0, c);
+ pos += c;
+ }
+ }
+
private void delete(List<String> files) {
for (String fn : files) {
try {
@@ -271,5 +499,4 @@
}
}
-
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
Mon Nov 10 12:41:23 2008
@@ -51,6 +51,7 @@
import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
import org.apache.kahadb.store.data.KahaCommitCommand;
import org.apache.kahadb.store.data.KahaDestination;
@@ -579,6 +580,5 @@
throw new IllegalArgumentException("Not in the valid destination
format");
}
}
-
-
+
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
Mon Nov 10 12:41:23 2008
@@ -51,6 +51,7 @@
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
import org.apache.kahadb.store.data.KahaCommitCommand;
import org.apache.kahadb.store.data.KahaDestination;
@@ -144,11 +145,11 @@
protected boolean deleteAllMessages;
protected File directory;
- protected boolean recovering;
protected Thread checkpointThread;
protected boolean syncWrites=true;
int checkpointInterval = 5*1000;
int cleanupInterval = 30*1000;
+ boolean opened;
protected AtomicBoolean started = new AtomicBoolean();
@@ -167,42 +168,8 @@
}
}
- public void load() throws IOException {
-
- recovering=true;
-
- // Creates the journal if it does not yet exist.
- getJournal();
- if (failIfJournalIsLocked) {
- journal.lock();
- } else {
- while (true) {
- try {
- journal.lock();
- break;
- } catch (IOException e) {
- LOG.info("Journal is locked... waiting " +
(JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be
unlocked.");
- try {
- Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
- } catch (InterruptedException e1) {
- }
- }
- }
- }
-
- // Creates the page file if it does not yet exist.
- getPageFile();
-
- journal.start();
- if (deleteAllMessages) {
- pageFile.delete();
- journal.delete();
-
- LOG.info("Persistence store purged.");
- deleteAllMessages = false;
- }
-
- synchronized (indexMutex) {
+ private void loadPageFile() throws IOException {
+ synchronized (indexMutex) {
pageFile.load();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
@@ -216,8 +183,6 @@
metadata.destinations = new BTreeIndex<String,
StoredDestination>(pageFile, tx.allocate().getPageId());
tx.store(metadata.page, metadataMarshaller, true);
-
- store(new KahaTraceCommand().setMessage("CREATED " +
new Date()));
} else {
Page<Metadata> page = tx.load(0, metadataMarshaller);
metadata = page.get();
@@ -231,7 +196,8 @@
pageFile.flush();
// Load up all the destinations since we need to scan all the
indexes to figure out which journal files can be deleted.
- // Perhaps we should just keep an index of file
+ // Perhaps we should just keep an index of file
+ storedDestinations.clear();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator =
metadata.destinations.iterator(tx); iterator.hasNext();) {
@@ -241,13 +207,54 @@
}
}
});
+ }
+ }
+
+ public void open() throws IOException {
+ if( !opened ) {
+ getJournal();
+ if (failIfJournalIsLocked) {
+ journal.lock();
+ } else {
+ while (true) {
+ try {
+ journal.lock();
+ break;
+ } catch (IOException e) {
+ LOG.info("Journal is locked... waiting " +
(JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be
unlocked.");
+ try {
+ Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+ }
+ getPageFile();
+ journal.start();
+ loadPageFile();
+ opened=true;
+ }
+ }
+
+ public void load() throws IOException {
+
+ open();
+ if (deleteAllMessages) {
+ journal.delete();
- // Replay the the journal to get the indexes up to date with the
- // latest
- // updates.
+ pageFile.unload();
+ pageFile.delete();
+ metadata = new Metadata();
+
+ LOG.info("Persistence store purged.");
+ deleteAllMessages = false;
+
+ loadPageFile();
+ }
+
+ synchronized (indexMutex) {
recover();
}
- recovering=false;
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
public void run() {
@@ -276,9 +283,18 @@
}
};
checkpointThread.start();
-
}
+
+ public void close() throws IOException {
+ synchronized (indexMutex) {
+ pageFile.unload();
+ metadata = new Metadata();
+ }
+ journal.close();
+ opened=false;
+ }
+
public void unload() throws IOException, InterruptedException {
checkpointThread.join();
@@ -293,12 +309,12 @@
}
});
-// metadata.destinations.unload(tx);
pageFile.unload();
metadata = new Metadata();
}
store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new
Date()));
journal.close();
+ opened=false;
}
/**
@@ -328,45 +344,61 @@
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
-
long start = System.currentTimeMillis();
- Location pos = null;
-
- // We need to recover the transactions..
- if (metadata.firstInProgressTransactionLocation != null) {
- pos = metadata.firstInProgressTransactionLocation;
- }
- // Perhaps there were no transactions...
- if( pos==null && metadata.lastUpdate!=null) {
- // Start replay at the record after the last one recorded in the
index file.
- pos = journal.getNextLocation(metadata.lastUpdate);
- // No journal records need to be recovered.
- if( pos == null ) {
- return;
- }
+ Location recoveryPosition = getRecoveryPosition();
+ if( recoveryPosition ==null ) {
+ return;
}
- // Do we need to start from the begining?
- if (pos == null) {
- // This loads the first position.
- pos = journal.getNextLocation(null);
- }
-
int redoCounter = 0;
- LOG.info("Journal Recovery Started from: " + journal + " at " +
pos.getDataFileId() + ":" + pos.getOffset());
+ LOG.info("Journal Recovery Started from: " + journal + " at " +
recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
- while (pos != null) {
- JournalCommand message = load(pos);
- process(message, pos);
+ while (recoveryPosition != null) {
+ JournalCommand message = load(recoveryPosition);
+ process(message, recoveryPosition);
redoCounter++;
- pos = journal.getNextLocation(pos);
+ recoveryPosition = journal.getNextLocation(recoveryPosition);
}
-
- Location location = store(new KahaTraceCommand().setMessage("RECOVERED
" + new Date()), true);
long end = System.currentTimeMillis();
LOG.info("Replayed " + redoCounter + " operations from redo log in " +
((end - start) / 1000.0f) + " seconds.");
}
+
+ private Location nextRecoveryPosition;
+ private Location lastRecoveryPosition;
+
+ public void incrementalRecover() throws IOException {
+ if( nextRecoveryPosition == null ) {
+ if( lastRecoveryPosition==null ) {
+ nextRecoveryPosition = getRecoveryPosition();
+ } else {
+ nextRecoveryPosition =
journal.getNextLocation(lastRecoveryPosition);
+ }
+ }
+ while (nextRecoveryPosition != null) {
+ lastRecoveryPosition = nextRecoveryPosition;
+ JournalCommand message = load(lastRecoveryPosition);
+ process(message, lastRecoveryPosition);
+ nextRecoveryPosition =
journal.getNextLocation(lastRecoveryPosition);
+ }
+ }
+
+ private Location getRecoveryPosition() throws IOException {
+
+ // If we need to recover the transactions..
+ if (metadata.firstInProgressTransactionLocation != null) {
+ return metadata.firstInProgressTransactionLocation;
+ }
+
+ // Perhaps there were no transactions...
+ if( metadata.lastUpdate!=null) {
+ // Start replay at the record after the last one recorded in the
index file.
+ return journal.getNextLocation(metadata.lastUpdate);
+ }
+
+ // This loads the first position.
+ return journal.getNextLocation(null);
+ }
protected void checkpointCleanup(final boolean cleanup) {
try {
@@ -420,9 +452,7 @@
data.writeFramed(os);
Location location = journal.write(os.toByteSequence(), sync);
process(data, location);
- if( !recovering ) {
- metadata.lastUpdate = location;
- }
+ metadata.lastUpdate = location;
return location;
}
Modified:
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
(original)
+++
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
Mon Nov 10 12:41:23 2008
@@ -21,8 +21,11 @@
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
import junit.framework.TestCase;
@@ -74,7 +77,7 @@
cluster.setClusterState(clusterState);
try {
- sendMesagesTo(500, BROKER1_URI);
+ sendMesagesTo(100, BROKER1_URI);
} catch( JMSException e ) {
fail("b1 did not become a master.");
}
@@ -86,13 +89,50 @@
clusterState.setSlaves(Arrays.asList(slaves));
cluster.setClusterState(clusterState);
- Thread.sleep(10000);
+ Thread.sleep(1000);
+
+
+ try {
+ sendMesagesTo(100, BROKER1_URI);
+ } catch( JMSException e ) {
+ fail("Failed to send more messages...");
+ }
+
+ Thread.sleep(1000);
+
+ // Make broker 2 the master.
+ clusterState = new ClusterState();
+ clusterState.setMaster(BROKER2_REPLICATION_ID);
+ cluster.setClusterState(clusterState);
+
+ Thread.sleep(1000);
+
+ assertReceived(200, BROKER2_URI);
b2.stop();
b1.stop();
}
+ private void assertReceived(int count, String brokerUri) throws
JMSException {
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory(brokerUri);
+ Connection con = cf.createConnection();
+ con.start();
+ try {
+ Session session = con.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(destination);
+ for (int i = 0; i < count; i++) {
+ TextMessage m = (TextMessage)
consumer.receive(1000);
+ if( m==null ) {
+ fail("Failed to receive message: "+i);
+ }
+ System.out.println("Got: "+m.getText());
+ }
+ } finally {
+ try { con.close(); } catch (Throwable e) {}
+ }
+ }
+
private void sendMesagesTo(int count, String brokerUri) throws
JMSException {
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory(brokerUri);
Connection con = cf.createConnection();