Revision: 87e732a6c911
Author:   Rick Shaw <wfs...@gmail.com>
Date:     Sun Nov  4 19:51:06 2012
Log:      Support for Collections Step #1
http://code.google.com/a/apache-extras.org/p/cassandra-jdbc/source/detail?r=87e732a6c911

Added:
 /src/main/java/org/apache/cassandra/cql/jdbc/ListMaker.java
 /src/main/java/org/apache/cassandra/cql/jdbc/MapMaker.java
 /src/main/java/org/apache/cassandra/cql/jdbc/Pair.java
 /src/main/java/org/apache/cassandra/cql/jdbc/SetMaker.java
 /src/test/java/org/apache/cassandra/cql/jdbc/CollectionsTest.java
Modified:
 /src/main/java/org/apache/cassandra/cql/jdbc/CassandraResultSet.java
 /src/main/java/org/apache/cassandra/cql/jdbc/CassandraResultSetExtras.java
 /src/main/java/org/apache/cassandra/cql/jdbc/ColumnDecoder.java
 /src/main/java/org/apache/cassandra/cql/jdbc/TypedColumn.java
 /src/main/java/org/apache/cassandra/cql/jdbc/Utils.java
 /src/test/java/org/apache/cassandra/cql/jdbc/DataSourceTest.java
 /src/test/java/org/apache/cassandra/cql/jdbc/SpashScreenTest.java
 /src/test/resources/log4j.properties

=======================================
--- /dev/null
+++ /src/main/java/org/apache/cassandra/cql/jdbc/ListMaker.java Sun Nov 4 19:51:06 2012
@@ -0,0 +1,85 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.jdbc;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ListMaker<T>
+{
+    // interning instances
+ private static final Map<AbstractJdbcType<?>, ListMaker> instances = new HashMap<AbstractJdbcType<?>, ListMaker>();
+
+    public final AbstractJdbcType<T> elements;
+
+
+ public static synchronized <T> ListMaker<T> getInstance(AbstractJdbcType<T> elements)
+    {
+        ListMaker<T> t = instances.get(elements);
+        if (t == null)
+        {
+            t = new ListMaker<T>(elements);
+            instances.put(elements, t);
+        }
+        return t;
+    }
+
+    private ListMaker(AbstractJdbcType<T> elements)
+    {
+        this.elements = elements;
+    }
+
+    public List<T> compose(ByteBuffer bytes)
+    {
+        ByteBuffer input = bytes.duplicate();
+        int n = input.getShort();
+        List<T> l = new ArrayList<T>(n);
+        for (int i = 0; i < n; i++)
+        {
+            int s = input.getShort();
+            byte[] data = new byte[s];
+            input.get(data);
+            ByteBuffer databb = ByteBuffer.wrap(data);
+            l.add(elements.compose(databb));
+        }
+        return l;
+    }
+
+    /**
+     * Layout is: {@code <n><s_1><b_1>...<s_n><b_n> }
+     * where:
+     *   n is the number of elements
+     *   s_i is the number of bytes composing the ith element
+     *   b_i is the s_i bytes composing the ith element
+     */
+    public ByteBuffer decompose(List<T> value)
+    {
+        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(value.size());
+        int size = 0;
+        for (T elt : value)
+        {
+            ByteBuffer bb = elements.decompose(elt);
+            bbs.add(bb);
+            size += 2 + bb.remaining();
+        }
+        return Utils.pack(bbs, value.size(), size);
+    }
+}
=======================================
--- /dev/null
+++ /src/main/java/org/apache/cassandra/cql/jdbc/MapMaker.java Sun Nov 4 19:51:06 2012
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.jdbc;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MapMaker<K, V>
+{
+    // interning instances
+ private static final Map<Pair<AbstractJdbcType<?>, AbstractJdbcType<?>>, MapMaker> instances = new HashMap<Pair<AbstractJdbcType<?>, AbstractJdbcType<?>>, MapMaker>();
+
+    public final AbstractJdbcType<K> keys;
+    public final AbstractJdbcType<V> values;
+
+ public static synchronized <K, V> MapMaker<K, V> getInstance(AbstractJdbcType<K> keys, AbstractJdbcType<V> values)
+    {
+ Pair<AbstractJdbcType<?>, AbstractJdbcType<?>> p = Pair.<AbstractJdbcType<?>, AbstractJdbcType<?>>create(keys, values);
+        MapMaker<K, V> t = instances.get(p);
+        if (t == null)
+        {
+            t = new MapMaker<K, V>(keys, values);
+            instances.put(p, t);
+        }
+        return t;
+    }
+
+    private MapMaker(AbstractJdbcType<K> keys, AbstractJdbcType<V> values)
+    {
+        this.keys = keys;
+        this.values = values;
+    }
+
+    public Map<K, V> compose(ByteBuffer bytes)
+    {
+        ByteBuffer input = bytes.duplicate();
+        int n = input.getShort();
+        Map<K, V> m = new LinkedHashMap<K, V>(n);
+        for (int i = 0; i < n; i++)
+        {
+            int sk = input.getShort();
+            byte[] datak = new byte[sk];
+            input.get(datak);
+            ByteBuffer kbb = ByteBuffer.wrap(datak);
+
+            int sv = input.getShort();
+            byte[] datav = new byte[sv];
+            input.get(datav);
+            ByteBuffer vbb = ByteBuffer.wrap(datav);
+
+            m.put(keys.compose(kbb), values.compose(vbb));
+        }
+        return m;
+    }
+
+    /**
+ * Layout is: {@code <n><sk_1><k_1><sv_1><v_1>...<sk_n><k_n><sv_n><v_n> }
+     * where:
+     *   n is the number of elements
+     *   sk_i is the number of bytes composing the ith key k_i
+     *   k_i is the sk_i bytes composing the ith key
+     *   sv_i is the number of bytes composing the ith value v_i
+     *   v_i is the sv_i bytes composing the ith value
+     */
+    public ByteBuffer decompose(Map<K, V> value)
+    {
+        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * value.size());
+        int size = 0;
+        for (Map.Entry<K, V> entry : value.entrySet())
+        {
+            ByteBuffer bbk = keys.decompose(entry.getKey());
+            ByteBuffer bbv = values.decompose(entry.getValue());
+            bbs.add(bbk);
+            bbs.add(bbv);
+            size += 4 + bbk.remaining() + bbv.remaining();
+        }
+        return Utils.pack(bbs, value.size(), size);
+    }
+
+}
=======================================
--- /dev/null
+++ /src/main/java/org/apache/cassandra/cql/jdbc/Pair.java Sun Nov 4 19:51:06 2012
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.jdbc;
+
+import com.google.common.base.Objects;
+
+public class Pair<T1, T2>
+{
+    public final T1 left;
+    public final T2 right;
+
+    protected Pair(T1 left, T2 right)
+    {
+        this.left = left;
+        this.right = right;
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        int hashCode = 31 + (left == null ? 0 : left.hashCode());
+        return 31*hashCode + (right == null ? 0 : right.hashCode());
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        if(!(o instanceof Pair))
+            return false;
+        Pair that = (Pair)o;
+        // handles nulls properly
+ return Objects.equal(left, that.left) && Objects.equal(right, that.right);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "(" + left + "," + right + ")";
+    }
+
+    public static <X, Y> Pair<X, Y> create(X x, Y y)
+    {
+        return new Pair<X, Y>(x, y);
+    }
+}
=======================================
--- /dev/null
+++ /src/main/java/org/apache/cassandra/cql/jdbc/SetMaker.java Sun Nov 4 19:51:06 2012
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.cql.jdbc;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SetMaker<T>
+{
+    // interning instances
+ private static final Map<AbstractJdbcType<?>, SetMaker> instances = new HashMap<AbstractJdbcType<?>, SetMaker>();
+
+    public final AbstractJdbcType<T> elements;
+
+
+ public static synchronized <T> SetMaker<T> getInstance(AbstractJdbcType<T> elements)
+    {
+        SetMaker<T> t = instances.get(elements);
+        if (t == null)
+        {
+            t = new SetMaker<T>(elements);
+            instances.put(elements, t);
+        }
+        return t;
+    }
+
+    private SetMaker(AbstractJdbcType<T> elements)
+    {
+        this.elements = elements;
+    }
+
+    public Set<T> compose(ByteBuffer bytes)
+    {
+        ByteBuffer input = bytes.duplicate();
+        int n = input.getShort();
+        Set<T> l = new LinkedHashSet<T>(n);
+        for (int i = 0; i < n; i++)
+        {
+            int s = input.getShort();
+            byte[] data = new byte[s];
+            input.get(data);
+            ByteBuffer databb = ByteBuffer.wrap(data);
+            l.add(elements.compose(databb));
+        }
+        return l;
+    }
+
+    /**
+     * Layout is: {@code <n><s_1><b_1>...<s_n><b_n> }
+     * where:
+     *   n is the number of elements
+     *   s_i is the number of bytes composing the ith element
+     *   b_i is the s_i bytes composing the ith element
+     */
+    public ByteBuffer decompose(Set<T> value)
+    {
+        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(value.size());
+        int size = 0;
+        for (T elt : value)
+        {
+            ByteBuffer bb = elements.decompose(elt);
+            bbs.add(bb);
+            size += 2 + bb.remaining();
+        }
+        return Utils.pack(bbs, value.size(), size);
+    }
+
+}
=======================================
--- /dev/null
+++ /src/test/java/org/apache/cassandra/cql/jdbc/CollectionsTest.java Sun Nov 4 19:51:06 2012
@@ -0,0 +1,290 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cql.jdbc;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+
+import org.apache.cassandra.cql.ConnectionDetails;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test CQL Collections Data Types
+ * List
+ * Map
+ * Set
+ *
+ */
+public class CollectionsTest
+{
+ private static final Logger LOG = LoggerFactory.getLogger(CollectionsTest.class);
+
+
+ private static final String HOST = System.getProperty("host", ConnectionDetails.getHost()); + private static final int PORT = Integer.parseInt(System.getProperty("port", ConnectionDetails.getPort() + ""));
+    private static final String KEYSPACE = "testks";
+    private static final String SYSTEM = "system";
+    private static final String CQLV3 = "3.0.0";
+
+    private static java.sql.Connection con = null;
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception
+    {
+        Class.forName("org.apache.cassandra.cql.jdbc.CassandraDriver");
+ String URL = String.format("jdbc:cassandra://%s:%d/%s?version=%s", HOST, PORT, SYSTEM, CQLV3);
+
+        con = DriverManager.getConnection(URL);
+
+        LOG.debug("URL         = '{}'", URL);
+
+        Statement stmt = con.createStatement();
+
+        // Use Keyspace
+        String useKS = String.format("USE %s;", KEYSPACE);
+
+        // Drop Keyspace
+        String dropKS = String.format("DROP KEYSPACE %s;", KEYSPACE);
+
+        try
+        {
+            stmt.execute(dropKS);
+        }
+        catch (Exception e)
+        {/* Exception on DROP is OK */}
+
+        // Create KeySpace
+ String createKS = String.format("CREATE KEYSPACE %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }",KEYSPACE); +// String createKS = String.format("CREATE KEYSPACE %s WITH strategy_class = SimpleStrategy AND strategy_options:replication_factor = 1;",KEYSPACE);
+        LOG.debug("createKS    = '{}'", createKS);
+
+        stmt = con.createStatement();
+        stmt.execute("USE " + SYSTEM);
+        stmt.execute(createKS);
+        stmt.execute(useKS);
+
+
+        // Create the target Table (CF)
+ String createTable = "CREATE TABLE testcollection (" + " k int PRIMARY KEY," + " L list<bigint>," + " M map<double, boolean>," + " S set<text>" + ") ;";
+        LOG.debug("createTable = '{}'", createTable);
+
+        stmt.execute(createTable);
+        stmt.close();
+        con.close();
+
+        // open it up again to see the new TABLE
+ URL = String.format("jdbc:cassandra://%s:%d/%s?version=%s", HOST, PORT, KEYSPACE, CQLV3);
+        con = DriverManager.getConnection(URL);
+        LOG.debug("URL         = '{}'", URL);
+
+        Statement statement = con.createStatement();
+
+ String insert = "INSERT INTO testcollection (k,L) VALUES( 1,[1, 3, 12345]);";
+        statement.executeUpdate(insert);
+ String update1 = "UPDATE testcollection SET S = {'red', 'white', 'blue'} WHERE k = 1;"; + String update2 = "UPDATE testcollection SET M = {2.0: 'true', 4.0: 'false', 6.0 : 'true'} WHERE k = 1;";
+        statement.executeUpdate(update1);
+        statement.executeUpdate(update2);
+
+
+ LOG.debug("Unit Test: 'CollectionsTest' initialization complete.\n\n");
+    }
+
+    /**
+     * Close down the connection when complete
+     */
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception
+    {
+        if (con != null) con.close();
+    }
+
+    @Test
+    public void testReadList() throws Exception
+    {
+        if (LOG.isDebugEnabled()) LOG.debug("Test: 'testReadList'.\n");
+
+        Statement statement = con.createStatement();
+
+ ResultSet result = statement.executeQuery("SELECT * FROM testcollection WHERE k = 1;");
+        result.next();
+
+        assertEquals(1, result.getInt("k"));
+
+        Object myObj = result.getObject("l");
+        LOG.debug("l           = '{}'", myObj);
+        List<Long> myList = (List<Long>) myObj;
+        assertEquals(3, myList.size());
+        assertTrue(12345L == myList.get(2));
+        assertTrue(myObj instanceof ArrayList);
+
+        myList = (List<Long>) extras(result).getList("l");
+        statement.close();
+        assertTrue(3L == myList.get(1));
+
+        if (LOG.isDebugEnabled()) LOG.debug("\n");
+    }
+
+    @Test
+    public void testUpdateList() throws Exception
+    {
+        if (LOG.isDebugEnabled()) LOG.debug("Test: 'testUpdateList'.\n");
+
+        Statement statement = con.createStatement();
+
+ String update1 = "UPDATE testcollection SET L = L + [2,4,6] WHERE k = 1;";
+        statement.executeUpdate(update1);
+
+ ResultSet result = statement.executeQuery("SELECT * FROM testcollection WHERE k = 1;");
+        result.next();
+
+        assertEquals(1, result.getInt("k"));
+        Object myObj = result.getObject("l");
+        List<Long> myList = (List<Long>) myObj;
+        assertEquals(6, myList.size());
+        assertTrue(12345L == myList.get(2));
+
+        LOG.debug("l           = '{}'", myObj);
+
+ String update2 = "UPDATE testcollection SET L = [98,99,100] + L WHERE k = 1;";
+        statement.executeUpdate(update2);
+ result = statement.executeQuery("SELECT * FROM testcollection WHERE k = 1;");
+        result.next();
+        myObj = result.getObject("l");
+        myList = (List<Long>) myObj;
+        assertTrue(100L == myList.get(0));
+
+        LOG.debug("l           = '{}'", myObj);
+
+ String update3 = "UPDATE testcollection SET L[0] = 2000 WHERE k = 1;";
+        statement.executeUpdate(update3);
+ result = statement.executeQuery("SELECT * FROM testcollection WHERE k = 1;");
+        result.next();
+        myObj = result.getObject("l");
+        myList = (List<Long>) myObj;
+
+        LOG.debug("l           = '{}'", myObj);
+
+// String update4 = "UPDATE testcollection SET L = L + ? WHERE k = 1;";
+//
+//        PreparedStatement prepared = con.prepareStatement(update4);
+//        prepared.setLong(1, 8888L);
+//        prepared.executeUpdate();
+//
+// result = prepared.executeQuery("SELECT * FROM testcollection WHERE k = 1;");
+//        result.next();
+//        myObj = result.getObject("l");
+//        myList = (List<Long>) myObj;
+//
+//        LOG.debug("l           = '{}'", myObj);
+
+        if (LOG.isDebugEnabled()) LOG.debug("\n");
+    }
+
+    @Test
+    public void testReadSet() throws Exception
+    {
+        if (LOG.isDebugEnabled()) LOG.debug("Test: 'testReadSet'.\n");
+
+        Statement statement = con.createStatement();
+
+
+ ResultSet result = statement.executeQuery("SELECT * FROM testcollection WHERE k = 1;");
+        result.next();
+
+        assertEquals(1, result.getInt("k"));
+
+        Object myObj = result.getObject("s");
+        LOG.debug("s           = '{}'", myObj);
+        Set<String> mySet = (Set<String>) myObj;
+        assertEquals(3, mySet.size());
+        assertTrue(mySet.contains("white"));
+        assertTrue(myObj instanceof LinkedHashSet);
+
+        if (LOG.isDebugEnabled()) LOG.debug("\n");
+    }
+
+    @Test
+    public void testUpdateSet() throws Exception
+    {
+        if (LOG.isDebugEnabled()) LOG.debug("Test: 'testUpdateSet'.\n");
+
+        Statement statement = con.createStatement();
+
+        // add some items to the set
+ String update1 = "UPDATE testcollection SET S = S + {'green', 'white', 'orange'} WHERE k = 1;";
+        statement.executeUpdate(update1);
+
+ ResultSet result = statement.executeQuery("SELECT * FROM testcollection WHERE k = 1;");
+        result.next();
+
+        assertEquals(1, result.getInt("k"));
+        Object myObj = result.getObject("s");
+        Set<String> mySet = (Set<String>) myObj;
+        assertEquals(5, mySet.size());
+        assertTrue(mySet.contains("white"));
+
+        LOG.debug("l           = '{}'", myObj);
+
+        // remove an item from the set
+ String update2 = "UPDATE testcollection SET S = S - {'red'} WHERE k = 1;";
+        statement.executeUpdate(update2);
+
+ result = statement.executeQuery("SELECT * FROM testcollection WHERE k = 1;");
+        result.next();
+
+        assertEquals(1, result.getInt("k"));
+
+        myObj = result.getObject("s");
+        mySet = (Set<String>) myObj;
+        assertEquals(4, mySet.size());
+        assertTrue(mySet.contains("white"));
+        assertFalse(mySet.contains("red"));
+
+        LOG.debug("s           = '{}'", myObj);
+
+        if (LOG.isDebugEnabled()) LOG.debug("\n");
+    }
+
+
+ private CassandraResultSetExtras extras(ResultSet result) throws Exception
+    {
+ Class crse = Class.forName("org.apache.cassandra.cql.jdbc.CassandraResultSetExtras");
+        return (CassandraResultSetExtras) result.unwrap(crse);
+    }
+
+}
=======================================
--- /src/main/java/org/apache/cassandra/cql/jdbc/CassandraResultSet.java Sun Oct 14 10:02:03 2012 +++ /src/main/java/org/apache/cassandra/cql/jdbc/CassandraResultSet.java Sun Nov 4 19:51:06 2012
@@ -32,11 +32,14 @@
 import java.sql.Date;
 import java.util.*;

+import org.apache.cassandra.cql.jdbc.TypedColumn.CollectionType;
 import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.thrift.CqlMetadata;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;

 /**
  * <p>The Supported Data types in CQL are as follows:</p>
@@ -61,6 +64,8 @@
  */
class CassandraResultSet extends AbstractResultSet implements CassandraResultSetExtras
 {
+ private static final Logger logger = LoggerFactory.getLogger(CassandraResultSet.class);
+
     public static final int DEFAULT_TYPE = ResultSet.TYPE_FORWARD_ONLY;
public static final int DEFAULT_CONCURRENCY = ResultSet.CONCUR_READ_ONLY; public static final int DEFAULT_HOLDABILITY = ResultSet.HOLD_CURSORS_OVER_COMMIT;
@@ -78,7 +83,7 @@
     /**
      * The values.
      */
-    private List<TypedColumn<?>> values = new ArrayList<TypedColumn<?>>();
+    private List<TypedColumn> values = new ArrayList<TypedColumn>();

     /**
      * The index map.
@@ -153,7 +158,7 @@
         // loop through the columns
         for (Column col : cols)
         {
-            TypedColumn<?> c = createColumn(col);
+            TypedColumn c = createColumn(col);
             String columnName = c.getNameString();
             values.add(c);
indexMap.put(columnName, values.size()); // one greater than 0 based index of a list
@@ -246,7 +251,7 @@
return (getBigDecimal(indexMap.get(name).intValue())).setScale(scale);
     }

- private BigDecimal getBigDecimal(TypedColumn<?> column) throws SQLException + private BigDecimal getBigDecimal(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -285,7 +290,7 @@
         return getBigInteger(indexMap.get(name).intValue());
     }

- private BigInteger getBigInteger(TypedColumn<?> column) throws SQLException + private BigInteger getBigInteger(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -323,7 +328,7 @@
         return getBoolean(indexMap.get(name).intValue());
     }

- private final Boolean getBoolean(TypedColumn<?> column) throws SQLException + private final Boolean getBoolean(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -363,7 +368,7 @@
         return getByte(indexMap.get(name).intValue());
     }

-    private final Byte getByte(TypedColumn<?> column) throws SQLException
+    private final Byte getByte(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -399,7 +404,7 @@
         return getBytes(indexMap.get(name).intValue());
     }

-    private byte[] getBytes(TypedColumn<?> column) throws SQLException
+    private byte[] getBytes(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         ByteBuffer value = (ByteBuffer) column.getRawColumn().value;
@@ -407,14 +412,14 @@
         return value == null ? null : ByteBufferUtil.clone(value).array();
     }

-    public TypedColumn<?> getColumn(int index) throws SQLException
+    public TypedColumn getColumn(int index) throws SQLException
     {
         checkIndex(index);
         checkNotClosed();
         return values.get(index - 1);
     }

-    public TypedColumn<?> getColumn(String name) throws SQLException
+    public TypedColumn getColumn(String name) throws SQLException
     {
         checkName(name);
         checkNotClosed();
@@ -453,7 +458,7 @@
         return getDate(name);
     }

-    private Date getDate(TypedColumn<?> column) throws SQLException
+    private Date getDate(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -489,7 +494,7 @@
         return getDouble(indexMap.get(name).intValue());
     }

- private final Double getDouble(TypedColumn<?> column) throws SQLException
+    private final Double getDouble(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -543,7 +548,7 @@
         return getFloat(indexMap.get(name).intValue());
     }

-    private final Float getFloat(TypedColumn<?> column) throws SQLException
+    private final Float getFloat(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -591,7 +596,7 @@
         return getInt(indexMap.get(name).intValue());
     }

-    private int getInt(TypedColumn<?> column) throws SQLException
+    private int getInt(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -621,6 +626,28 @@
     {
         return curRowKey;
     }
+
+    public List<?> getList(int index) throws SQLException
+    {
+        checkIndex(index);
+        return getList(values.get(index - 1));
+    }
+
+    public List<?> getList(String name) throws SQLException
+    {
+        checkName(name);
+        return getList(indexMap.get(name).intValue());
+    }
+
+    private List<?> getList(TypedColumn column) throws SQLException
+    {
+        checkNotClosed();
+        Object value = column.getValue();
+        wasNull = value == null;
+        if (column.getCollectionType() != CollectionType.LIST)
+ throw new SQLSyntaxErrorException(String.format(NOT_TRANSLATABLE, value.getClass().getSimpleName(), "List"));
+        return (List<?>) value;
+    }

     public long getLong(int index) throws SQLException
     {
@@ -634,7 +661,7 @@
         return getLong(indexMap.get(name).intValue());
     }

-    private Long getLong(TypedColumn<?> column) throws SQLException
+    private Long getLong(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -662,6 +689,28 @@
throw new SQLSyntaxErrorException(String.format(NOT_TRANSLATABLE, value.getClass().getSimpleName(), "Long"));
     }

+    public Map<?,?> getMap(int index) throws SQLException
+    {
+        checkIndex(index);
+        return getMap(values.get(index - 1));
+    }
+
+    public Map<?,?> getMap(String name) throws SQLException
+    {
+        checkName(name);
+        return getMap(indexMap.get(name).intValue());
+    }
+
+    private Map<?,?> getMap(TypedColumn column) throws SQLException
+    {
+        checkNotClosed();
+        Object value = column.getValue();
+        wasNull = value == null;
+        if (column.getCollectionType() != CollectionType.MAP)
+ throw new SQLSyntaxErrorException(String.format(NOT_TRANSLATABLE, value.getClass().getSimpleName(), "Map"));
+        return (Map<?,?>) value;
+    }
+
     public ResultSetMetaData getMetaData() throws SQLException
     {
         checkNotClosed();
@@ -681,7 +730,7 @@
     }


-    private Object getObject(TypedColumn<?> column) throws SQLException
+    private Object getObject(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -707,7 +756,7 @@
         return getRowId(indexMap.get(name).intValue());
     }

-    private final RowId getRowId(TypedColumn<?> column) throws SQLException
+    private final RowId getRowId(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         ByteBuffer value =  column.getRawColumn().value;
@@ -720,6 +769,28 @@
         checkIndex(index);
         return getShort(values.get(index - 1));
     }
+
+    public Set<?> getSet(int index) throws SQLException
+    {
+        checkIndex(index);
+        return getSet(values.get(index - 1));
+    }
+
+    public Set<?> getSet(String name) throws SQLException
+    {
+        checkName(name);
+        return getSet(indexMap.get(name).intValue());
+    }
+
+    private Set<?> getSet(TypedColumn column) throws SQLException
+    {
+        checkNotClosed();
+        Object value = column.getValue();
+        wasNull = value == null;
+        if (column.getCollectionType() != CollectionType.SET)
+ throw new SQLSyntaxErrorException(String.format(NOT_TRANSLATABLE, value.getClass().getSimpleName(), "Set"));
+        return (Set<?>) value;
+    }

     public short getShort(String name) throws SQLException
     {
@@ -727,7 +798,7 @@
         return getShort(indexMap.get(name).intValue());
     }

-    private final Short getShort(TypedColumn<?> column) throws SQLException
+    private final Short getShort(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -771,7 +842,7 @@
         return getString(indexMap.get(name).intValue());
     }

-    private String getString(TypedColumn<?> column) throws SQLException
+    private String getString(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -805,7 +876,7 @@
         return getTime(name);
     }

-    private Time getTime(TypedColumn<?> column) throws SQLException
+    private Time getTime(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -855,7 +926,7 @@
         return getTimestamp(name);
     }

- private Timestamp getTimestamp(TypedColumn<?> column) throws SQLException
+    private Timestamp getTimestamp(TypedColumn column) throws SQLException
     {
         checkNotClosed();
         Object value = column.getValue();
@@ -954,7 +1025,9 @@
     {
         if (hasMoreRows())
         {
-            populateColumns();
+ // populateColumns is called upon init to set up the metadata fields; so skip first call + if (rowNumber != 0) populateColumns(); else rowsIterator.next();
+//            populateColumns();
             rowNumber++;
             return true;
         }
@@ -964,31 +1037,56 @@
             return false;
         }
     }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-       private TypedColumn<?> createColumn(Column column)
+
+    private String bbToString(ByteBuffer buffer)
     {
-        assert column != null;
-        assert column.name != null;
-
-        String columnname=null;
-
         try
         {
-            columnname = string(column.name);
+            return string(buffer);
         }
         catch (CharacterCodingException e)
         {
             throw new RuntimeException(e);
         }

+    }
+
+    private TypedColumn createColumn(Column column)
+    {
+        assert column != null;
+        assert column.name != null;
+
+        AbstractJdbcType<?> keyType = null;
+        CollectionType type = CollectionType.NOT_COLLECTION;
         String nameType = schema.name_types.get(column.name);
         if (nameType==null) nameType = "AsciiType";
AbstractJdbcType<?> comparator = TypesMap.getTypeForComparator(nameType == null ? schema.default_name_type : nameType);
         String valueType = schema.value_types.get(column.name);
AbstractJdbcType<?> validator = TypesMap.getTypeForComparator(valueType == null ? schema.default_value_type : valueType);
-        nameType = columnname;
-        return new TypedColumn(column, comparator, validator);
+        if (validator == null)
+        {
+            int index = valueType.indexOf("(");
+            assert index > 0;
+
+            String collectionClass = valueType.substring(0, index);
+ if (collectionClass.endsWith("ListType")) type = CollectionType.LIST; + else if (collectionClass.endsWith("SetType")) type = CollectionType.SET; + else if (collectionClass.endsWith("MapType")) type = CollectionType.MAP;
+
+ String[] split = valueType.substring(index+1, valueType.length()-1).split(",");
+            if (split.length > 1)
+            {
+                keyType = TypesMap.getTypeForComparator(split[0]);
+                validator = TypesMap.getTypeForComparator(split[1]);
+            }
+            else validator = TypesMap.getTypeForComparator(split[0]);
+
+        }
+
+ TypedColumn tc = new TypedColumn(column, comparator, validator, keyType, type);
+        logger.debug("tc = "+tc);
+
+        return tc;
     }

     public boolean previous() throws SQLException
@@ -1021,8 +1119,7 @@
         fetchSize = size;
     }

-    @SuppressWarnings("unchecked")
-       public <T> T unwrap(Class<T> iface) throws SQLException
+    public <T> T unwrap(Class<T> iface) throws SQLException
     {
         if (iface.equals(CassandraResultSetExtras.class)) return (T) this;

@@ -1093,20 +1190,16 @@
return values.get(column - 1).getValueType().getClass().getSimpleName();
         }

-        @SuppressWarnings("unchecked")
-               public int getPrecision(int column) throws SQLException
+        public int getPrecision(int column) throws SQLException
         {
             checkIndex(column);
-            @SuppressWarnings("rawtypes")
-                       TypedColumn col = values.get(column - 1);
+            TypedColumn col = values.get(column - 1);
             return col.getValueType().getPrecision(col.getValue());
         }

-        @SuppressWarnings("unchecked")
-               public int getScale(int column) throws SQLException
+        public int getScale(int column) throws SQLException
         {
             checkIndex(column);
-            @SuppressWarnings("rawtypes")
             TypedColumn tc = values.get(column - 1);
             return tc.getValueType().getScale(tc.getValue());
         }
@@ -1134,14 +1227,14 @@
         public boolean isCaseSensitive(int column) throws SQLException
         {
             checkIndex(column);
-            TypedColumn<?> tc = values.get(column - 1);
+            TypedColumn tc = values.get(column - 1);
             return tc.getValueType().isCaseSensitive();
         }

         public boolean isCurrency(int column) throws SQLException
         {
             checkIndex(column);
-            TypedColumn<?> tc = values.get(column - 1);
+            TypedColumn tc = values.get(column - 1);
             return tc.getValueType().isCurrency();
         }

@@ -1175,7 +1268,7 @@
         public boolean isSigned(int column) throws SQLException
         {
             checkIndex(column);
-            TypedColumn<?> tc = values.get(column - 1);
+            TypedColumn tc = values.get(column - 1);
             return tc.getValueType().isSigned();
         }

=======================================
--- /src/main/java/org/apache/cassandra/cql/jdbc/CassandraResultSetExtras.java Tue Oct 2 08:46:28 2012 +++ /src/main/java/org/apache/cassandra/cql/jdbc/CassandraResultSetExtras.java Sun Nov 4 19:51:06 2012
@@ -23,6 +23,9 @@
 import java.math.BigInteger;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;

 public interface CassandraResultSetExtras extends ResultSet
 {
@@ -35,9 +38,19 @@
     public BigInteger getBigInteger(int i) throws SQLException;
     /** @return a BigInteger value for the given column name */
     public BigInteger getBigInteger(String name) throws SQLException;
+
+    public List<?> getList(int index) throws SQLException;
+    public List<?> getList(String name) throws SQLException;
+
+    public Set<?> getSet(int index) throws SQLException;
+    public Set<?> getSet(String name) throws SQLException;
+
+    public Map<?,?> getMap(int index) throws SQLException;
+    public Map<?,?> getMap(String name) throws SQLException;
+

     /** @return the raw column data for the given column offset */
-    public TypedColumn<?> getColumn(int i) throws SQLException;
+    public TypedColumn getColumn(int i) throws SQLException;
     /** @return the raw column data for the given column name */
-    public TypedColumn<?> getColumn(String name) throws SQLException;
+    public TypedColumn getColumn(String name) throws SQLException;
 }
=======================================
--- /src/main/java/org/apache/cassandra/cql/jdbc/ColumnDecoder.java Tue Oct 2 08:46:28 2012 +++ /src/main/java/org/apache/cassandra/cql/jdbc/ColumnDecoder.java Sun Nov 4 19:51:06 2012
@@ -147,28 +147,28 @@
     }

     /** constructs a typed column */
-       @SuppressWarnings({ "rawtypes", "unchecked" })
- public TypedColumn<?> makeCol(String keyspace, String columnFamily, Column column)
-    {
-        return new TypedColumn(column,
- getNameType(keyspace, columnFamily, column.name), - getValueType(keyspace, columnFamily, column.name));
-    }
+//     @SuppressWarnings({ "rawtypes", "unchecked" })
+// public TypedColumn makeCol(String keyspace, String columnFamily, Column column)
+//    {
+//        return new TypedColumn(column,
+// getNameType(keyspace, columnFamily, column.name), +// getValueType(keyspace, columnFamily, column.name));
+//    }

     /** constructs a typed column to hold the key
      * @throws SQLNonTransientException */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
- public TypedColumn<?> makeKeyColumn(String keyspace, String columnFamily, byte[] key) throws SQLNonTransientException
-    {
- CFamMeta cf = metadata.get(String.format("%s.%s", keyspace, columnFamily));
-        if (cf == null)
- throw new SQLNonTransientException(String.format("could not find decoder metadata for: %s.%s", - keyspace, - columnFamily));
-
- Column column = new Column(cf.keyAlias).setValue(key).setTimestamp(-1);
-        return new TypedColumn(column,
- getNameType(keyspace, columnFamily, (cf.keyAlias != null) ? cf.keyAlias : DEFAULT_KEY_NAME), - getValueType(keyspace, columnFamily, (cf.keyAlias != null) ? cf.keyAlias : DEFAULT_KEY_NAME));
-    }
+//    @SuppressWarnings({ "unchecked", "rawtypes" })
+// public TypedColumn makeKeyColumn(String keyspace, String columnFamily, byte[] key) throws SQLNonTransientException
+//    {
+// CFamMeta cf = metadata.get(String.format("%s.%s", keyspace, columnFamily));
+//        if (cf == null)
+// throw new SQLNonTransientException(String.format("could not find decoder metadata for: %s.%s", +// keyspace, +// columnFamily));
+//
+// Column column = new Column(cf.keyAlias).setValue(key).setTimestamp(-1);
+//        return new TypedColumn(column,
+// getNameType(keyspace, columnFamily, (cf.keyAlias != null) ? cf.keyAlias : DEFAULT_KEY_NAME), +// getValueType(keyspace, columnFamily, (cf.keyAlias != null) ? cf.keyAlias : DEFAULT_KEY_NAME));
+//    }
 }
=======================================
--- /src/main/java/org/apache/cassandra/cql/jdbc/TypedColumn.java Thu Oct 18 08:43:37 2012 +++ /src/main/java/org/apache/cassandra/cql/jdbc/TypedColumn.java Sun Nov 4 19:51:06 2012
@@ -27,32 +27,64 @@
 import org.apache.cassandra.utils.ByteBufferUtil;


-public class TypedColumn<T>
+public class TypedColumn
 {
+    public enum CollectionType {NOT_COLLECTION,MAP,LIST,SET};
+
     private final Column rawColumn;

// we cache the frequently-accessed forms: java object for value, String for name.
     // Note that {N|V}.toString() isn't always the same as Type.getString
     // (a good example is byte buffers).
-    private final T value;
+    private final Object value;
     private final String nameString;
-    private final AbstractJdbcType<T> nameType, valueType;
-
- public TypedColumn(Column column, AbstractJdbcType<T> comparator, AbstractJdbcType<T> validator)
+    private final AbstractJdbcType<?> nameType, valueType, keyType;
+    private final CollectionType collectionType;
+
+ public TypedColumn(Column column, AbstractJdbcType<?> comparator, AbstractJdbcType<?> validator)
+    {
+ this(column,comparator, validator, null, CollectionType.NOT_COLLECTION);
+    }
+
+ public TypedColumn(Column column, AbstractJdbcType<?> nameType, AbstractJdbcType<?> valueType, AbstractJdbcType<?> keyType, CollectionType type)
     {
         rawColumn = column;
- this.value = (column.value == null | | !column.value.hasRemaining()) ? null : validator.compose(column.value);
-        nameString = comparator.getString(column.name);
-        nameType = comparator;
-        valueType = validator;
+        this.collectionType = type;
+        this.nameType = nameType;
+        this.nameString = nameType.getString(column.name);
+        this.valueType = valueType;
+        this.keyType = keyType;
+
+        if (column.value == null || !column.value.hasRemaining())
+        {
+            this.value = null;
+        }
+        else switch(collectionType)
+        {
+            case NOT_COLLECTION:
+                this.value =  valueType.compose(column.value);
+                break;
+            case LIST:
+ value = ListMaker.getInstance(valueType).compose(column.value);
+                break;
+            case SET:
+ value = SetMaker.getInstance(valueType).compose(column.value);
+                break;
+            case MAP:
+ value = MapMaker.getInstance(keyType, valueType).compose(column.value);
+                break;
+           default:
+                value = null;
+        }
     }
+

     public Column getRawColumn()
     {
         return rawColumn;
     }

-    public T getValue()
+    public Object getValue()
     {
         return value;
     }
@@ -67,25 +99,34 @@
         return valueType.getString(rawColumn.value);
     }

-    public AbstractJdbcType<T> getNameType()
+    public AbstractJdbcType getNameType()
     {
-        return nameType;
+        return  nameType;
     }

-    public AbstractJdbcType<T> getValueType()
+    public AbstractJdbcType getValueType()
     {
         return valueType;
     }
+
+    public CollectionType getCollectionType()
+    {
+        return collectionType;
+    }
+

     public String toString()
     {
- return String.format("TypedColumn [rawColumn=%s, value=%s, nameString=%s, nameType=%s, valueType=%s]", + return String.format("TypedColumn [rawColumn=%s, value=%s, nameString=%s, nameType=%s, valueType=%s, keyType=%s, collectionType=%s]",
             displayRawColumn(rawColumn),
             value,
             nameString,
             nameType,
-            valueType);
+            valueType,
+            keyType,
+            collectionType);
     }
+
     private String displayRawColumn(Column column)
     {
         String name;
=======================================
--- /src/main/java/org/apache/cassandra/cql/jdbc/Utils.java Tue Jun 26 08:48:39 2012 +++ /src/main/java/org/apache/cassandra/cql/jdbc/Utils.java Sun Nov 4 19:51:06 2012
@@ -28,6 +28,7 @@
 import java.sql.SQLException;
 import java.sql.SQLNonTransientConnectionException;
 import java.sql.SQLSyntaxErrorException;
+import java.util.List;
 import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -278,4 +279,25 @@
         if (isUpdate.matches()) cf = isUpdate.group(1);
         return cf;
     }
+
+    // Utility method
+    /**
+ * Utility method to pack bytes into a byte buffer from a list of ByteBuffers
+     *
+ * @param buffers A list of ByteBuffers representing the elements to pack
+     * @param elements The count of the elements
+     * @param size The size in bytes of the result buffer
+     * @return The packed ByteBuffer
+     */
+ protected static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int size)
+    {
+        ByteBuffer result = ByteBuffer.allocate(2 + size);
+        result.putShort((short)elements);
+        for (ByteBuffer bb : buffers)
+        {
+            result.putShort((short)bb.remaining());
+            result.put(bb.duplicate());
+        }
+        return (ByteBuffer)result.flip();
+    }
 }
=======================================
--- /src/test/java/org/apache/cassandra/cql/jdbc/DataSourceTest.java Sun Oct 14 20:59:55 2012 +++ /src/test/java/org/apache/cassandra/cql/jdbc/DataSourceTest.java Sun Nov 4 19:51:06 2012
@@ -63,6 +63,7 @@

         // Create KeySpace
String createKS = String.format("CREATE KEYSPACE \"%s\" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};",KEYSPACE); +// String createKS = String.format("CREATE KEYSPACE %s WITH strategy_class = SimpleStrategy AND strategy_options:replication_factor = 1;",KEYSPACE);
         stmt.execute(createKS);
     }

=======================================
--- /src/test/java/org/apache/cassandra/cql/jdbc/SpashScreenTest.java Sun Oct 14 20:59:55 2012 +++ /src/test/java/org/apache/cassandra/cql/jdbc/SpashScreenTest.java Sun Nov 4 19:51:06 2012
@@ -43,7 +43,7 @@
     public static void setUpBeforeClass() throws Exception
     {
         Class.forName("org.apache.cassandra.cql.jdbc.CassandraDriver");
- con = DriverManager.getConnection(String.format("jdbc:cassandra://%s:%d/%s",HOST,PORT,"system")); + con = DriverManager.getConnection(String.format("jdbc:cassandra://%s:%d/%s?version=3.0.0",HOST,PORT,"system"));
         Statement stmt = con.createStatement();

         // Drop Keyspace
=======================================
--- /src/test/resources/log4j.properties        Sat Dec 17 20:01:00 2011
+++ /src/test/resources/log4j.properties        Sun Nov  4 19:51:06 2012
@@ -1,7 +1,7 @@
 #  Test Log4J Properties File

 log4j.rootLogger=WARN, stdout
-log4j.logger.org.apache.cassandra.cql.jdbc=INFO
+log4j.logger.org.apache.cassandra.cql.jdbc=DEBUG

 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

Reply via email to