Author: chirino
Date: Tue Nov 11 12:10:04 2008
New Revision: 713149

URL: http://svn.apache.org/viewvc?rev=713149&view=rev
Log:
Added better slave synchronization handling on the master side.  We now create 
a snapshot for each slave session and clean up the snapshot once the slave is 
online.

Modified:
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
    
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 Tue Nov 11 12:10:04 2008
@@ -22,6 +22,9 @@
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,16 +55,19 @@
 
 import com.google.protobuf.ByteString;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
 public class ReplicationMaster implements Service, ClusterListener, 
ReplicationTarget {
 
        private static final Log LOG = 
LogFactory.getLog(ReplicationServer.class);
 
        private final ReplicationServer replicationServer;
 
-       private Object serverMutex = new Object() {
-       };
+       private Object serverMutex = new Object() {};
        private TransportServer server;
        private CopyOnWriteArrayList<ReplicationSession> sessions = new 
CopyOnWriteArrayList<ReplicationSession>();
+       
+       AtomicInteger nextSnapshotId = new AtomicInteger();
 
        public ReplicationMaster(ReplicationServer replication1Server) {
                this.replicationServer = replication1Server;
@@ -136,6 +142,10 @@
 
                private final Transport transport;
                private final AtomicBoolean subscribedToJournalUpdates = new 
AtomicBoolean();
+               
+               private File snapshotFile;
+               private HashSet<Integer> journalReplicatedFiles;
+               private boolean online;
 
                public ReplicationSession(Transport transport) {
                        this.transport = transport;
@@ -147,6 +157,7 @@
                }
 
                public void stop() throws Exception {
+                       deleteReplicationData();
                        transport.stop();
                }
 
@@ -155,9 +166,11 @@
                                ReplicationFrame frame = (ReplicationFrame) 
command;
                                switch (frame.getHeader().getType()) {
                                case SLAVE_INIT:
-                                       subscribedToJournalUpdates.set(true);
                                        onSlaveInit(frame, (PBSlaveInit) 
frame.getPayload());
                                        break;
+                               case SLAVE_ONLINE:
+                                       onSlaveOnline(frame);
+                                       break;
                                case FILE_TRANSFER:
                                        onFileTransfer(frame, (PBFileInfo) 
frame.getPayload());
                                        break;
@@ -187,8 +200,27 @@
                public void transportResumed() {
                }
                
+               private void onSlaveOnline(ReplicationFrame frame) {
+                       online = true;
+                       deleteReplicationData();
+               }
+
+               private void deleteReplicationData() {
+                       if( snapshotFile!=null ) {
+                               snapshotFile.delete();
+                               snapshotFile=null;
+                       }
+                       if( journalReplicatedFiles!=null ) {
+                               journalReplicatedFiles=null;
+                               updateJournalReplicatedFiles();
+                       }
+               }
+
                private void onSlaveInit(ReplicationFrame frame, PBSlaveInit 
slaveInit) throws Exception {
 
+                       // Start sending journal updates to the slave.
+                       subscribedToJournalUpdates.set(true);
+
                        // We could look at the slave state sent in the 
slaveInit and decide
                        // that a full sync is not needed..
                        // but for now we will do a full sync every time.
@@ -197,37 +229,78 @@
                        rc.setHeader(new 
PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
                        rc.setPayload(rcPayload);
                        
+                       // Setup a map of all the files that the slave has
+                       final HashMap<String, PBFileInfo> slaveFiles = new 
HashMap<String, PBFileInfo>();
+                       for (PBFileInfo info : slaveInit.getCurrentFilesList()) 
{
+                               slaveFiles.put(info.getName(), info);
+                       }
+                       
+                       
                        final KahaDBStore store = replicationServer.getStore();
                        store.checkpoint(new Callback() {
                                public void execute() throws Exception {
                                        // This call back is executed once the 
checkpoint is
-                                       // completed and all data has been
-                                       // synced to disk, but while a lock is 
still held on the
-                                       // store so that no
-                                       // updates are allowed.
-                                       ArrayList<PBFileInfo> infos = new 
ArrayList<PBFileInfo>();
+                                       // completed and all data has been 
synced to disk, 
+                                       // but while a lock is still held on 
the store so 
+                                       // that no updates are done while we 
are in this
+                                       // method.
+                                       
+                                       KahaDBStore store = 
replicationServer.getStore();
 
+                                       int snapshotId = 
nextSnapshotId.incrementAndGet();
+                                       File file = 
store.getPageFile().getFile();
+                                       snapshotFile = new 
File(file.getParentFile(), "snapshot-" + snapshotId);
+                                       
+                                       journalReplicatedFiles = new 
HashSet<Integer>();
+                                       
+                                       // Store the list files associated with 
the snapshot.
+                                       ArrayList<PBFileInfo> snapshotInfos = 
new ArrayList<PBFileInfo>();
                                        Map<Integer, DataFile> journalFiles = 
store.getJournal().getFileMap();
                                        for (DataFile df : 
journalFiles.values()) {
-                                               
infos.add(replicationServer.createInfo("journal-" + df.getDataFileId(), 
df.getFile(), df.getLength()));
+                                               // Look at what the slave has 
so that only the missing bits are transfered.
+                                               String name = "journal-" + 
df.getDataFileId();
+                                               PBFileInfo slaveInfo = 
slaveFiles.get(name);
+                                               
+                                               // Use the checksum info to see 
if the slave has the file already.. Checksums are less acurrate for
+                                               // small amounts of data.. so 
ignore small files.
+                                               if( slaveInfo!=null && 
slaveInfo.getEnd()> 1024*512 ) {
+                                                       // If the slave's file 
checksum matches what we have..
+                                                       if( 
replicationServer.checksum(df.getFile(), 0, 
slaveInfo.getEnd())==slaveInfo.getChecksum() ) {
+                                                               // is Our file 
longer? then we need to continue transferring the rest of the file.
+                                                               if( 
df.getLength() > slaveInfo.getEnd() ) {
+                                                                       
snapshotInfos.add(replicationServer.createInfo(name, df.getFile(), 
slaveInfo.getEnd(), df.getLength()));
+                                                                       
journalReplicatedFiles.add(df.getDataFileId());
+                                                                       
continue;
+                                                               } else {
+                                                                       // No 
need to replicate this file.
+                                                                       
continue;
+                                                               }
+                                                       } 
+                                               }
+                                               
+                                               // If we got here then it means 
we need to transfer the whole file.
+                                               
snapshotInfos.add(replicationServer.createInfo(name, df.getFile(), 0, 
df.getLength()));
+                                               
journalReplicatedFiles.add(df.getDataFileId());
                                        }
+
+                                       PBFileInfo info = new PBFileInfo();
+                                       info.setName("database");
+                                       info.setSnapshotId(snapshotId);
+                                       info.setStart(0);
+                                       info.setEnd(file.length());
+                                       info.setChecksum(copyAndChecksum(file, 
snapshotFile));
+                                       snapshotInfos.add(info);
                                        
-                                       SnapshotStatus snapshot = 
createSnapshot();
-                                       PBFileInfo databaseInfo = new 
PBFileInfo();
-                                       databaseInfo.setName("database");
-                                       databaseInfo.setSnapshotId(snapshot.id);
-                                       databaseInfo.setStart(0);
-                                       databaseInfo.setEnd(snapshot.size);
-                                       
databaseInfo.setChecksum(snapshot.checksum);
-                                       infos.add(databaseInfo);
+                                       
rcPayload.setCopyFilesList(snapshotInfos);
                                        
-                                       rcPayload.setCopyFilesList(infos);
+                                       updateJournalReplicatedFiles();
                                }
+
                        });
                        
                        transport.oneway(rc);
                }
-
+               
                private void onFileTransfer(ReplicationFrame frame, PBFileInfo 
fileInfo) throws IOException {
                        File file = 
replicationServer.getReplicationFile(fileInfo.getName());
                        long payloadSize = 
fileInfo.getEnd()-fileInfo.getStart();
@@ -254,38 +327,20 @@
 
        }
 
-       static class SlaveStatus {
-               String salve_id;
-               PBJournalLocation lastAck;
-               Integer syncingSnapshot;
-       }
-
-       static class SnapshotStatus {
-               int id;
-               File file;
-               long checksum;
-               PBJournalLocation lastJournalLocation;
-               long size;
-       }
-
-
-
-       int nextSnapshotId;
-       SnapshotStatus currentSnapshot;
-       private SnapshotStatus createSnapshot() throws IOException {
-               if (currentSnapshot == null) {
-                       currentSnapshot = new SnapshotStatus();
-                       currentSnapshot.id = nextSnapshotId++;
-                       KahaDBStore store = replicationServer.getStore();
-                       File file = store.getPageFile().getFile();
-                       currentSnapshot.file = new File(file.getParentFile(), 
"snapshot-" + currentSnapshot.id);
-                       currentSnapshot.checksum = copyAndChecksum(file, 
currentSnapshot.file);
-                       currentSnapshot.lastJournalLocation = 
convert(store.getJournal().getLastAppendLocation());
-                       currentSnapshot.size = currentSnapshot.file.length();
+       /**
+        * Looks at all the journal files being currently replicated and 
informs the KahaDB so that
+        * it does not delete them while the replication is occuring.
+        */
+       private void updateJournalReplicatedFiles() {
+               HashSet<Integer>  files = 
replicationServer.getStore().getJournalFilesBeingReplicated();
+               files.clear();
+               for (ReplicationSession session : sessions) {
+                       if( session.journalReplicatedFiles !=null ) {
+                               files.addAll(session.journalReplicatedFiles);
+                       }
                }
-               return currentSnapshot;
        }
-
+       
        private PBJournalLocation convert(Location loc) {
                if( loc==null ) {
                        return null;
@@ -313,11 +368,11 @@
                } finally {
                        try {
                                is.close();
-                       } finally {
+                       } catch(Throwable e) {
                        }
                        try {
                                os.close();
-                       } finally {
+                       } catch(Throwable e) {
                        }
                }
        }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
 Tue Nov 11 12:10:04 2008
@@ -17,8 +17,8 @@
 package org.apache.kahadb.replication;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
@@ -211,24 +211,36 @@
                }
        }
        
-       PBFileInfo createInfo(String name, File file, long length) throws 
IOException {
+       PBFileInfo createInfo(String name, File file, long start, long length) 
throws IOException {
                PBFileInfo rc = new PBFileInfo();
                rc.setName(name);
-               FileInputStream is = new FileInputStream(file);
-               byte buffer[] = new byte[1024 * 4];
-               int c;
-
-               long size = 0;
-               Checksum checksum = new Adler32();
-               while (size < length && (c = is.read(buffer, 0, (int) 
Math.min(length - size, buffer.length))) >= 0) {
-                       checksum.update(buffer, 0, c);
-                       size += c;
-               }
-               rc.setChecksum(checksum.getValue());
-               rc.setStart(0);
-               rc.setEnd(size);
+               rc.setChecksum(checksum(file, start, length));
+               rc.setStart(start);
+               rc.setEnd(length);
                return rc;
        }
+       
+       long checksum(File file, long start, long end) throws IOException {
+               RandomAccessFile raf = new RandomAccessFile(file, "r");
+               try {
+                       Checksum checksum = new Adler32();
+                       byte buffer[] = new byte[1024 * 4];
+                       int c;
+                       long pos = start;
+                       raf.seek(start);
+                       
+                       while (pos < end && (c = raf.read(buffer, 0, (int) 
Math.min(end - pos, buffer.length))) >= 0) {
+                               checksum.update(buffer, 0, c);
+                               pos += c;
+                       }
+                       
+                       return checksum.getValue();
+               } finally {
+                       try { raf.close(); } catch (Throwable e){}
+               }
+       }
+
+       
        public boolean isMaster() {
                return master!=null;
        }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 Tue Nov 11 12:10:04 2008
@@ -98,11 +98,11 @@
                                                continue;
                                        }
                                        
-                                       PBFileInfo info = 
replicationServer.createInfo("database", file, file.length());
+                                       PBFileInfo info = 
replicationServer.createInfo("database", file, 0, file.length());
                                        info.setSnapshotId(snapshot);
                                        infosMap.put("database", info);
                                } else if( name.startsWith("journal-") ) {
-                                       PBFileInfo info = 
replicationServer.createInfo(name, file, file.length());
+                                       PBFileInfo info = 
replicationServer.createInfo(name, file, 0, file.length());
                                        infosMap.put(name, info);
                                }
                        }
@@ -117,12 +117,12 @@
                        if( infosMap.containsKey(name) ) {
                                continue;
                        }
-                       infosMap.put(name, replicationServer.createInfo(name, 
df.getFile(), df.getLength()));
+                       infosMap.put(name, replicationServer.createInfo(name, 
df.getFile(), 0, df.getLength()));
                }
                if( !infosMap.containsKey("database") ) {
                        File pageFile = store.getPageFile().getFile();
                        if( pageFile.exists() ) {
-                               infosMap.put("database", 
replicationServer.createInfo("database", pageFile, pageFile.length()));
+                               infosMap.put("database", 
replicationServer.createInfo("database", pageFile, 0, pageFile.length()));
                        }
                }
                
@@ -208,13 +208,15 @@
                                        // Once the data has been synced.. we 
are going to 
                                        // go into an online recovery mode...
                                        file = 
replicationServer.getReplicationFile(name);
-                                       onlineRecovery=true;
                                }
                                journalUpateFile = new RandomAccessFile(file, 
"rw");
                                journalUpdateFileId = location.getFileId();
-                       }
+                       }                       
                        journalUpateFile.seek(location.getOffset());
                        journalUpateFile.write(data);
+                       if( !bulkSynchronizing ) {
+                               onlineRecovery=true;
+                       }
                }
                
                if( onlineRecovery ) {
@@ -236,8 +238,12 @@
        private void commitBulkTransfer() throws IOException {
                synchronized (transferMutex) {
                        
-                       journalUpateFile.close();
-                       journalUpateFile=null;
+                       LOG.info("Slave synhcronization complete, going 
online...");
+
+                       if( journalUpateFile!=null ) {
+                               journalUpateFile.close();
+                               journalUpateFile=null;
+                       }
                        replicationServer.getStore().close();
                        
                        // If we got a new snapshot of the database, then we 
need to 
@@ -258,7 +264,15 @@
                        bulkSynchronizing=false;
                        
                        replicationServer.getStore().open();
+                       
+                       LOG.info("Slave is now online.  We are now eligible to 
become the master.");
                }
+               
+               // Let the master know we are now online.
+               ReplicationFrame frame = new ReplicationFrame();
+               frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
+               transport.oneway(frame);
+               
                replicationServer.getStore().incrementalRecover();
        }
 

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
 Tue Nov 11 12:10:04 2008
@@ -417,19 +417,16 @@
 
     
        public void checkpoint(Callback closure) throws Exception {
-        try {
-            synchronized (indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx, false);
-                    }
-                });
-                pageFile.flush();
-                closure.execute();
-            }
-            store(new KahaTraceCommand().setMessage("CHECKPOINT " + new 
Date()), true);
-        } catch (IOException e) {
+        synchronized (indexMutex) {
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    checkpointUpdate(tx, false);
+                }
+            });
+            pageFile.flush();
+            closure.execute();
         }
+        store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), 
true);
        }
 
     // /////////////////////////////////////////////////////////////////
@@ -619,6 +616,7 @@
     // /////////////////////////////////////////////////////////////////
 
     protected final Object indexMutex = new Object();
+       private final HashSet<Integer> journalFilesBeingReplicated = new 
HashSet<Integer>();
 
     private void upadateIndex(Transaction tx, KahaAddMessageCommand command, 
Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), 
tx);
@@ -775,7 +773,7 @@
     
                 });
             }
-            
+            inUseFiles.addAll(journalFilesBeingReplicated);
             Location l = metadata.lastUpdate;
             if( metadata.firstInProgressTransactionLocation!=null ) {
                 l = metadata.firstInProgressTransactionLocation;
@@ -787,12 +785,17 @@
         
         LOG.debug("Checkpoint done.");
     }
+    
+    public HashSet<Integer> getJournalFilesBeingReplicated() {
+               return journalFilesBeingReplicated;
+       }
 
     // /////////////////////////////////////////////////////////////////
     // StoredDestination related implementation methods.
     // /////////////////////////////////////////////////////////////////
 
-    private final HashMap<String, StoredDestination> storedDestinations = new 
HashMap<String, StoredDestination>();
+
+       private final HashMap<String, StoredDestination> storedDestinations = 
new HashMap<String, StoredDestination>();
 
     class StoredSubscription {
         SubscriptionInfo subscriptionInfo;

Modified: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto (original)
+++ activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto Tue Nov 11 
12:10:04 2008
@@ -42,14 +42,19 @@
        // @followed-by PBSlaveInitResponse     
        SLAVE_INIT_RESPONSE = 1;
   
+       // The Slave will send this this command to the master once he has 
completed 
+       // all his bulk synchronizations and he is ready to take over as being 
a master. 
+       //    
+       // @followed-by null    
+       SLAVE_ONLINE=2;
+       
        // Sent from the Master to the slave to replicate a Journal update.
        //    
        // @followed-by PBJournalUpdate 
        JOURNAL_UPDATE=3;       
        
-       // An ack sent from the Slave to a master to let the master know up to 
where in the journal the slave has
-       // synchronized to.  This acknowledges receipt of all previous journal 
records.  This should not be sent until
-       // all bulk file copies are complete.
+       // An ack sent back to the master in response to to a received 
+       // JOURNAL_UPDATE
        //    
        // @followed-by PBJournalLocation       
        JOURNAL_UPDATE_ACK=4;
@@ -91,9 +96,15 @@
        // The files that the slave should delete
        repeated string delete_files=2;
 }
-
 message PBJournalUpdate {
+    // Journal location of the update.
     required PBJournalLocation location=1;
+    // The data that will be written at that location.
     required bytes data=2;
+    // Should the slave send back an ack for this update.
+    optional bool send_ack=3;
+    // If true, then the slave should do a disk sync before returning a
+    // JOURNAL_UPDATE_ACK
+    optional bool disk_sync=4;
 }
 

Modified: 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
 (original)
+++ 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
 Tue Nov 11 12:10:04 2008
@@ -21,7 +21,6 @@
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -77,7 +76,7 @@
                cluster.setClusterState(clusterState);
                
                try {
-                       sendMesagesTo(100, BROKER1_URI);
+                       sendMesagesTo(BROKER1_URI, 100, "Pass 1: ");
                } catch( JMSException e ) {
                        fail("b1 did not become a master.");
                }
@@ -93,12 +92,12 @@
                
                
                try {
-                       sendMesagesTo(100, BROKER1_URI);
+                       sendMesagesTo(BROKER1_URI, 100, "Pass 2: ");
                } catch( JMSException e ) {
                        fail("Failed to send more messages...");
                }
                
-               Thread.sleep(1000);
+               Thread.sleep(2000);
                
                // Make broker 2 the master.
                clusterState = new ClusterState();
@@ -133,14 +132,14 @@
                }
        }
 
-       private void sendMesagesTo(int count, String brokerUri) throws 
JMSException {
+       private void sendMesagesTo(String brokerUri, int count, String msg) 
throws JMSException {
                ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory(brokerUri);
                Connection con = cf.createConnection();
                try {
                        Session session = con.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
                        MessageProducer producer = 
session.createProducer(destination);
                        for (int i = 0; i < count; i++) {
-                               producer.send(session.createTextMessage("Hello: 
"+i));
+                               producer.send(session.createTextMessage(msg+i));
                        }
                } finally {
                        try { con.close(); } catch (Throwable e) {}


Reply via email to