Author: jbellis
Date: Wed Sep 22 03:41:18 2010
New Revision: 999739

URL: http://svn.apache.org/viewvc?rev=999739&view=rev
Log:
Table.rebuildIndex
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415


Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed 
Sep 22 03:41:18 2010
@@ -495,6 +495,19 @@ public class CompactionManager implement
         return tablePairs;
     }
 
+    public Future submitIndexBuild(final ColumnFamilyStore cfs, final 
KeyIterator iter)
+    {
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                executor.beginCompaction(cfs, iter);
+                Table.open(cfs.table).rebuildIndex(cfs, iter);
+            }
+        };
+        return executor.submit(runnable);
+    }
+
     private static class AntiCompactionIterator extends CompactionIterator
     {
         private Set<SSTableScanner> scanners;
@@ -566,7 +579,7 @@ public class CompactionManager implement
     private static class CompactionExecutor extends 
DebuggableThreadPoolExecutor
     {
         private volatile ColumnFamilyStore cfs;
-        private volatile CompactionIterator ci;
+        private volatile ICompactionInfo ci;
 
         public CompactionExecutor()
         {
@@ -581,7 +594,7 @@ public class CompactionManager implement
             ci = null;
         }
 
-        void beginCompaction(ColumnFamilyStore cfs, CompactionIterator ci)
+        void beginCompaction(ColumnFamilyStore cfs, ICompactionInfo ci)
         {
             this.cfs = cfs;
             this.ci = ci;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22 
03:41:18 2010
@@ -30,10 +30,10 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.dht.LocalToken;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.KeyIterator;
 import org.apache.cassandra.io.sstable.SSTableDeletingReference;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
@@ -42,6 +42,7 @@ import org.apache.commons.lang.ArrayUtil
 
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.utils.FBUtilities;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -365,51 +366,11 @@ public class Table
                 {
                     synchronized (indexLockFor(mutation.key()))
                     {
-                        // read old indexed values
-                        QueryFilter filter = QueryFilter.getNamesFilter(key, 
new QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
-                        ColumnFamily oldIndexedColumns = 
cfs.getColumnFamily(filter);
-
-                        // ignore obsolete column updates
-                        if (oldIndexedColumns != null)
-                        {
-                            for (IColumn oldColumn : oldIndexedColumns)
-                            {
-                                if (cfs.metadata.reconciler.reconcile((Column) 
oldColumn, (Column) cf.getColumn(oldColumn.name())).equals(oldColumn))
-                                {
-                                    cf.remove(oldColumn.name());
-                                    
mutatedIndexedColumns.remove(oldColumn.name());
-                                    oldIndexedColumns.remove(oldColumn.name());
-                                }
-                            }
-                        }
+                        ColumnFamily oldIndexedColumns = 
readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
+                        ignoreObsoleteMutations(cf, cfs.metadata.reconciler, 
mutatedIndexedColumns, oldIndexedColumns);
 
-                        // apply the mutation
                         applyCF(cfs, key, cf, memtablesToFlush);
-
-                        // add new index entries
-                        for (byte[] columnName : mutatedIndexedColumns)
-                        {
-                            IColumn column = cf.getColumn(columnName);
-                            DecoratedKey<LocalToken> valueKey = 
cfs.getIndexKeyFor(columnName, column.value());
-                            ColumnFamily cfi = 
cfs.newIndexedColumnFamily(columnName);
-                            cfi.addColumn(new Column(mutation.key(), 
ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
-                            
applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi, 
memtablesToFlush);
-                        }
-
-                        // remove the old index entries
-                        if (oldIndexedColumns != null)
-                        {
-                            int localDeletionTime = 
(int)(System.currentTimeMillis() / 1000);
-                            for (Map.Entry<byte[], IColumn> entry : 
oldIndexedColumns.getColumnsMap().entrySet())
-                            {
-                                byte[] columnName = entry.getKey();
-                                IColumn column = entry.getValue();
-                                DecoratedKey<LocalToken> valueKey = 
cfs.getIndexKeyFor(columnName, column.value());
-                                ColumnFamily cfi = 
cfs.newIndexedColumnFamily(columnName);
-                                cfi.deleteColumn(mutation.key(), 
localDeletionTime, column.clock());
-                                
applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi, 
memtablesToFlush);
-                            }
-                        }
+                        applyIndexUpdates(mutation.key(), memtablesToFlush, 
cf, cfs, mutatedIndexedColumns, oldIndexedColumns);
                     }
                 }
 
@@ -428,24 +389,84 @@ public class Table
             entry.getKey().maybeSwitchMemtable(entry.getValue(), 
writeCommitLog);
     }
 
-    public void applyIndexedCF(ColumnFamilyStore indexedCfs, DecoratedKey 
rowKey, DecoratedKey indexedKey, ColumnFamily indexedColumnFamily) 
+    private static void ignoreObsoleteMutations(ColumnFamily cf, 
AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns, 
ColumnFamily oldIndexedColumns)
     {
-        Memtable memtableToFlush;
-        flusherLock.readLock().lock();
-        try
+        if (oldIndexedColumns == null)
+            return;
+
+        for (IColumn oldColumn : oldIndexedColumns)
         {
-            synchronized (indexLockFor(rowKey.key))
+            if (reconciler.reconcile((Column) oldColumn, (Column) 
cf.getColumn(oldColumn.name())).equals(oldColumn))
             {
-                memtableToFlush = indexedCfs.apply(indexedKey, 
indexedColumnFamily);
+                cf.remove(oldColumn.name());
+                mutatedIndexedColumns.remove(oldColumn.name());
+                oldIndexedColumns.remove(oldColumn.name());
             }
         }
-        finally 
+    }
+
+    private static ColumnFamily readCurrentIndexedColumns(DecoratedKey key, 
ColumnFamilyStore cfs, SortedSet<byte[]> mutatedIndexedColumns)
+    {
+        QueryFilter filter = QueryFilter.getNamesFilter(key, new 
QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
+        return cfs.getColumnFamily(filter);
+    }
+
+    private static void applyIndexUpdates(byte[] key,
+                                          HashMap<ColumnFamilyStore, Memtable> 
memtablesToFlush,
+                                          ColumnFamily cf,
+                                          ColumnFamilyStore cfs,
+                                          SortedSet<byte[]> 
mutatedIndexedColumns,
+                                          ColumnFamily oldIndexedColumns)
+    {
+        // add new index entries
+        for (byte[] columnName : mutatedIndexedColumns)
         {
-            flusherLock.readLock().unlock();
+            IColumn column = cf.getColumn(columnName);
+            DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, 
column.value());
+            ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
+            cfi.addColumn(new Column(key, ArrayUtils.EMPTY_BYTE_ARRAY, 
column.clock()));
+            applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, 
cfi, memtablesToFlush);
         }
 
-        if (memtableToFlush != null)
-            indexedCfs.maybeSwitchMemtable(memtableToFlush, false);
+        // remove the old index entries
+        if (oldIndexedColumns != null)
+        {
+            int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
+            for (Map.Entry<byte[], IColumn> entry : 
oldIndexedColumns.getColumnsMap().entrySet())
+            {
+                byte[] columnName = entry.getKey();
+                IColumn column = entry.getValue();
+                DecoratedKey<LocalToken> valueKey = 
cfs.getIndexKeyFor(columnName, column.value());
+                ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
+                cfi.deleteColumn(key, localDeletionTime, column.clock());
+                applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, 
cfi, memtablesToFlush);
+            }
+        }
+    }
+
+    public void rebuildIndex(ColumnFamilyStore cfs, KeyIterator iter)
+    {
+        while (iter.hasNext())
+        {
+            DecoratedKey key = iter.next();
+            HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new 
HashMap<ColumnFamilyStore, Memtable>(2);
+            flusherLock.readLock().lock();
+            try
+            {
+                synchronized (indexLockFor(key.key))
+                {
+                    ColumnFamily cf = readCurrentIndexedColumns(key, cfs, 
cfs.getIndexedColumns());
+                    applyIndexUpdates(key.key, memtablesToFlush, cf, cfs, 
cf.getColumnNames(), null);
+                }
+            }
+            finally
+            {
+                flusherLock.readLock().unlock();
+            }
+
+            for (Map.Entry<ColumnFamilyStore, Memtable> entry : 
memtablesToFlush.entrySet())
+                entry.getKey().maybeSwitchMemtable(entry.getValue(), false);
+        }
     }
 
     private Object indexLockFor(byte[] key)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java 
Wed Sep 22 03:41:18 2010
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,11 +36,11 @@ import org.apache.commons.collections.it
 
 import org.apache.cassandra.utils.ReducingIterator;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
 
-public class CompactionIterator extends 
ReducingIterator<SSTableIdentityIterator, AbstractCompactedRow> implements 
Closeable
+public class CompactionIterator extends 
ReducingIterator<SSTableIdentityIterator, AbstractCompactedRow>
+implements Closeable, ICompactionInfo
 {
     private static Logger logger = 
LoggerFactory.getLogger(CompactionIterator.class);
 

Added: cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java?rev=999739&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java 
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java Wed 
Sep 22 03:41:18 2010
@@ -0,0 +1,8 @@
+package org.apache.cassandra.io;
+
+public interface ICompactionInfo
+{
+    public long getTotalBytes();
+
+    public long getBytesRead();
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=999739&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java 
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java 
Wed Sep 22 03:41:18 2010
@@ -0,0 +1,64 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class KeyIterator extends AbstractIterator<DecoratedKey> implements 
ICompactionInfo, Closeable
+{
+    private final BufferedRandomAccessFile in;
+    private final Descriptor desc;
+
+    public KeyIterator(Descriptor desc) throws IOException
+    {
+        this.desc = desc;
+        in = new BufferedRandomAccessFile(new 
File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "r");
+    }
+
+    protected DecoratedKey computeNext()
+    {
+        try
+        {
+            if (in.isEOF())
+                return endOfData();
+            DecoratedKey key = 
SSTableReader.decodeKey(StorageService.getPartitioner(), desc, 
FBUtilities.readShortByteArray(in));
+            in.readLong(); // skip data position
+            return key;
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public void close() throws IOException
+    {
+        in.close();
+    }
+
+    public long getBytesRead()
+    {
+        return in.getFilePointer();
+    }
+
+    public long getTotalBytes()
+    {
+        try
+        {
+            return in.length();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Wed 
Sep 22 03:41:18 2010
@@ -23,7 +23,8 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.*;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
Wed Sep 22 03:41:18 2010
@@ -22,8 +22,8 @@ package org.apache.cassandra.io.sstable;
 import java.io.*;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -225,7 +225,7 @@ public class SSTableWriter extends SSTab
             return;
 
         ColumnFamilyStore cfs = 
Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
-        Set<byte[]> indexedColumns = cfs.getIndexedColumns();
+
         // remove existing files
         ifile.delete();
         ffile.delete();
@@ -255,54 +255,11 @@ public class SSTableWriter extends SSTab
             {
                 key = SSTableReader.decodeKey(StorageService.getPartitioner(), 
desc, FBUtilities.readShortByteArray(dfile));
                 long dataSize = SSTableReader.readRowSize(dfile, desc);
-                if (!indexedColumns.isEmpty())
-                {
-                    // skip bloom filter and column index
-                    dfile.readFully(new byte[dfile.readInt()]);
-                    dfile.readFully(new byte[dfile.readInt()]);
-
-                    // index the column data
-                    ColumnFamily cf = ColumnFamily.create(desc.ksname, 
desc.cfname);
-                    
ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
-                    int columns = dfile.readInt();
-                    for (int i = 0; i < columns; i++)
-                    {
-                        IColumn iColumn = 
cf.getColumnSerializer().deserialize(dfile);
-                        if (indexedColumns.contains(iColumn.name()))
-                        {
-                            DecoratedKey valueKey = 
cfs.getIndexKeyFor(iColumn.name(), iColumn.value());
-                            ColumnFamily indexedCf = 
cfs.newIndexedColumnFamily(iColumn.name());
-                            indexedCf.addColumn(new Column(key.key, 
ArrayUtils.EMPTY_BYTE_ARRAY, iColumn.clock()));
-                            logger.debug("adding indexed column row mutation 
for key {}", valueKey);
-                            
Table.open(desc.ksname).applyIndexedCF(cfs.getIndexedColumnFamilyStore(iColumn.name()),
-                                                                   key,
-                                                                   valueKey,
-                                                                   indexedCf);
-                        }
-                    }
-                }
-
                 iwriter.afterAppend(key, dataPosition);
                 dataPosition = dfile.getFilePointer() + dataSize;
                 dfile.seek(dataPosition);
                 rows++;
             }
-
-            for (byte[] column : cfs.getIndexedColumns())
-            {
-                try
-                {
-                    
cfs.getIndexedColumnFamilyStore(column).forceBlockingFlush();
-                }
-                catch (ExecutionException e)
-                {
-                    throw new RuntimeException(e);
-                }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
-            }
         }
         finally
         {
@@ -313,7 +270,26 @@ public class SSTableWriter extends SSTab
             }
             catch (IOException e)
             {
-                logger.error("Failed to close data or index file during 
recovery of " + desc, e);
+                throw new IOError(e);
+            }
+        }
+
+        if (!cfs.getIndexedColumns().isEmpty())
+        {
+            Future future = CompactionManager.instance.submitIndexBuild(cfs, 
new KeyIterator(desc));
+            try
+            {
+                future.get();
+                for (byte[] column : cfs.getIndexedColumns())
+                    
cfs.getIndexedColumnFamilyStore(column).forceBlockingFlush();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e);
             }
         }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Wed 
Sep 22 03:41:18 2010
@@ -23,27 +23,20 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.*;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.commons.cli.*;
+
 import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.TimestampClock;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.sstable.*;
 
 import static org.apache.cassandra.utils.FBUtilities.bytesToHex;
 import static org.apache.cassandra.utils.FBUtilities.hexToBytes;
-import org.apache.commons.cli.*;
 
 /**
  * Export SSTables to JSON format.
@@ -158,18 +151,14 @@ public class SSTableExport
     public static void enumeratekeys(String ssTableFile, PrintStream outs)
     throws IOException
     {
-        IPartitioner partitioner = StorageService.getPartitioner();
         Descriptor desc = Descriptor.fromFilename(ssTableFile);
-        BufferedRandomAccessFile input = new 
BufferedRandomAccessFile(desc.filenameFor(Component.PRIMARY_INDEX), "r");
-        while (!input.isEOF())
+        KeyIterator iter = new KeyIterator(desc);
+        while (iter.hasNext())
         {
-            DecoratedKey decoratedKey = SSTableReader.decodeKey(partitioner,
-                                                                desc,
-                                                                
FBUtilities.readShortByteArray(input));
-            long dataPosition = input.readLong();
-            outs.println(bytesToHex(decoratedKey.key));
+            DecoratedKey key = iter.next();
+            outs.println(bytesToHex(key.key));
         }
-
+        iter.close();
         outs.flush();
     }
 


Reply via email to