[ 
https://issues.apache.org/jira/browse/PHOENIX-900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14324939#comment-14324939
 ] 

ASF GitHub Bot commented on PHOENIX-900:
----------------------------------------

Github user elilevine commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/37#discussion_r24857039
  
    --- Diff: 
phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---
    @@ -0,0 +1,280 @@
    +/*
    + * Copyright 2014 The Apache Software Foundation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + *distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you maynot use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicablelaw or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.execute;
    +
    +import static com.google.common.collect.Lists.newArrayList;
    +import static com.google.common.collect.Sets.newHashSet;
    +import static java.util.Collections.singleton;
    +import static java.util.Collections.singletonList;
    +import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
    +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
    +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
    +import static 
org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
    +import static 
org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
    +import static 
org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
    +import static org.apache.phoenix.util.TestUtil.LOCALHOST;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +import java.sql.Connection;
    +import java.sql.Driver;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.DoNotRetryIOException;
    +import org.apache.hadoop.hbase.HBaseIOException;
    +import org.apache.hadoop.hbase.HBaseTestingUtility;
    +import org.apache.hadoop.hbase.client.Durability;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    +import org.apache.hadoop.hbase.coprocessor.RegionObserver;
    +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
    +import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
    +import org.apache.phoenix.hbase.index.Indexer;
    +import org.apache.phoenix.jdbc.PhoenixConnection;
    +import org.apache.phoenix.query.QueryServices;
    +import org.apache.phoenix.util.PhoenixRuntime;
    +import org.apache.phoenix.util.ReadOnlyProps;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Maps;
    +
    +@Category(NeedsOwnMiniClusterTest.class)
    +public class PartialCommitIT {
    +    
    +    private static final String TABLE_NAME_TO_FAIL = 
"b_failure_table".toUpperCase();
    +    private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me");
    +    private static final String UPSERT_TO_FAIL = "upsert into " + 
TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')";
    +    private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + 
TABLE_NAME_TO_FAIL + " select k, c from a_success_table";
    +    private static final String DELETE_TO_FAIL = "delete from " + 
TABLE_NAME_TO_FAIL + "  where k='z'";
    +    private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
    +    private static String url;
    +    private static Driver driver;
    +    private static final Properties props = new Properties();
    +    
    +    static {
    +        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
    +    }
    +    
    +    @BeforeClass
    +    public static void setupCluster() throws Exception {
    +      Configuration conf = TEST_UTIL.getConfiguration();
    +      setUpConfigForMiniCluster(conf);
    +      conf.setClass("hbase.coprocessor.region.classes", 
FailingRegionObserver.class, RegionObserver.class);
    +      conf.setBoolean("hbase.coprocessor.abortonerror", false);
    +      conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
    +      TEST_UTIL.startMiniCluster();
    +      String clientPort = 
TEST_UTIL.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
    +      url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + 
JDBC_PROTOCOL_SEPARATOR + clientPort
    +              + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
    +
    +      Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
    +      // Must update config before starting server
    +      props.put(QueryServices.DROP_METADATA_ATTRIB, 
Boolean.toString(true));
    +      driver = initAndRegisterDriver(url, new 
ReadOnlyProps(props.entrySet().iterator()));
    +      createTablesWithABitOfData();
    +    }
    +    
    +    private static void createTablesWithABitOfData() throws Exception {
    +        Properties props = new Properties();
    +        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
    +
    +        try (Connection con = driver.connect(url, new Properties())) {
    +            Statement sta = con.createStatement();
    +            sta.execute("create table a_success_table (k varchar primary 
key, c varchar)");
    +            sta.execute("create table b_failure_table (k varchar primary 
key, c varchar)");
    +            sta.execute("create table c_success_table (k varchar primary 
key, c varchar)");
    +            con.commit();
    +        }
    +
    +        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
    +
    +        try (Connection con = driver.connect(url, new Properties())) {
    +            con.setAutoCommit(false);
    +            Statement sta = con.createStatement();
    +            for (String table : newHashSet("a_success_table", 
TABLE_NAME_TO_FAIL, "c_success_table")) {
    +                sta.execute("upsert into " + table + " values ('z', 'z')");
    +                sta.execute("upsert into " + table + " values ('zz', 
'zz')");
    +                sta.execute("upsert into " + table + " values ('zzz', 
'zzz')");
    +            }
    +            con.commit();
    +        }
    +    }
    +    
    +    @AfterClass
    +    public static void teardownCluster() throws Exception {
    +      TEST_UTIL.shutdownMiniCluster();
    +    }
    +    
    +    @Test
    +    public void testNoFailure() {
    +        testPartialCommit(singletonList("upsert into a_success_table 
values ('testNoFailure', 'a')"), 0, Collections.<Integer>emptySet(), false,
    +                                        singletonList("select count(*) 
from a_success_table where k='testNoFailure'"), singletonList(new Integer(1)));
    +    }
    +    
    +    @Test
    +    public void testUpsertFailure() {
    +        testPartialCommit(newArrayList("upsert into a_success_table values 
('testUpsertFailure1', 'a')", 
    +                                       UPSERT_TO_FAIL, 
    +                                       "upsert into a_success_table values 
('testUpsertFailure2', 'b')"), 
    +                                       1, singleton(new Integer(1)), true,
    +                                       newArrayList("select count(*) from 
a_success_table where k like 'testUpsertFailure_'",
    +                                                    "select count(*) from 
" + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
    +                                       newArrayList(new Integer(2), new 
Integer(0)));
    +    }
    +    
    +    @Test
    +    public void testUpsertSelectFailure() throws SQLException {
    +        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
    +
    +        try (Connection con = driver.connect(url, new Properties())) {
    +            con.createStatement().execute("upsert into a_success_table 
values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')");
    +            con.commit();
    +        }
    +        
    +        testPartialCommit(newArrayList("upsert into a_success_table values 
('testUpsertSelectFailure', 'a')", 
    +                                       UPSERT_SELECT_TO_FAIL), 
    +                                       1, singleton(new Integer(1)), true,
    +                                       newArrayList("select count(*) from 
a_success_table where k in ('testUpsertSelectFailure', '" + 
Bytes.toString(ROW_TO_FAIL) + "')",
    +                                                    "select count(*) from 
" + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
    +                                       newArrayList(new Integer(2), new 
Integer(0)));
    +    }
    +    
    +    @Test
    +    public void testDeleteFailure() {
    +        testPartialCommit(newArrayList("upsert into a_success_table values 
('testDeleteFailure1', 'a')", 
    +                                       DELETE_TO_FAIL,
    +                                       "upsert into a_success_table values 
('testDeleteFailure2', 'b')"), 
    +                                       1, singleton(new Integer(1)), true,
    +                                       newArrayList("select count(*) from 
a_success_table where k like 'testDeleteFailure_'",
    +                                                    "select count(*) from 
" + TABLE_NAME_TO_FAIL + " where k = 'z'"), 
    +                                       newArrayList(new Integer(2), new 
Integer(1)));
    +    }
    +    
    +    /**
    +     * {@link MutationState} keeps mutations ordered lexicographically by 
table name.
    +     */
    +    @Test
    +    public void testOrderOfMutationsIsPredicatable() {
    +        testPartialCommit(newArrayList("upsert into c_success_table values 
('testOrderOfMutationsIsPredicatable', 'c')", // will fail because 
c_success_table is after b_failure_table by table sort order
    +                                       UPSERT_TO_FAIL, 
    +                                       "upsert into a_success_table values 
('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because 
a_success_table is before b_failure_table by table sort order
    +                                       2, newHashSet(new Integer(1), new 
Integer(0)), true,
    +                                       newArrayList("select count(*) from 
c_success_table where k='testOrderOfMutationsIsPredicatable'",
    +                                                    "select count(*) from 
a_success_table where k='testOrderOfMutationsIsPredicatable'",
    +                                                    "select count(*) from 
" + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
    +                                       newArrayList(new Integer(0), new 
Integer(1), new Integer(0)));
    +    }
    +    
    +    @Test
    +    public void checkThatAllStatementTypesMaintainOrderInConnection() {
    +        testPartialCommit(newArrayList("upsert into a_success_table values 
('k', 'checkThatAllStatementTypesMaintainOrderInConnection')", 
    +                                       "upsert into a_success_table select 
k, c from c_success_table",
    +                                       DELETE_TO_FAIL,
    +                                       "select * from a_success_table", 
    +                                       UPSERT_TO_FAIL), 
    +                                       2, newHashSet(new Integer(2), new 
Integer(4)), true,
    +                                       newArrayList("select count(*) from 
a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", 
// rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
    +                                                    "select count(*) from 
" + TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'",
    +                                                    "select count(*) from 
" + TABLE_NAME_TO_FAIL + " where k = 'z'"), 
    +                                       newArrayList(new Integer(4), new 
Integer(0), new Integer(1)));
    +    }
    +    
    +    private void testPartialCommit(List<String> statements, int 
failureCount, Set<Integer> orderOfFailedStatements, boolean willFail,
    +                                   List<String> 
countStatementsForVerification, List<Integer> expectedCountsForVerification) {
    +        Preconditions.checkArgument(countStatementsForVerification.size() 
== expectedCountsForVerification.size());
    +        
    +        try (Connection con = driver.connect(url, new Properties())) {
    +            con.setAutoCommit(false);
    +            Statement sta = con.createStatement();
    +            for (String statement : statements) {
    +                sta.execute(statement);
    +            }
    +            try {
    +                con.commit();
    +                if (willFail) {
    +                    fail("Expected at least one statement in the list to 
fail");
    +                } else {
    +                    assertEquals(0, 
con.unwrap(PhoenixConnection.class).getStatementExecutionCounter()); // should 
have been reset to 0 in commit()
    +                }
    +            } catch (SQLException sqle) {
    +                if (!willFail) {
    +                    fail("Expected no statements to fail");
    +                }
    +                assertEquals(CommitException.class, sqle.getClass());
    +                Set<Integer> failures = 
((CommitException)sqle).getUncommittedStatementIndexes();
    +                assertEquals(failureCount, failures.size());
    +                assertEquals(orderOfFailedStatements, failures);
    +            }
    +            
    +            // verify data in HBase
    +            for (int i = 0; i < countStatementsForVerification.size(); 
i++) {
    +                String countStatement = 
countStatementsForVerification.get(i);
    +                ResultSet rs = sta.executeQuery(countStatement);
    +                if (!rs.next()) {
    +                    fail("Expected a single row from count query");
    +                }
    +                
assertEquals(expectedCountsForVerification.get(i).intValue(), rs.getInt(1));
    +            }
    +        } catch (SQLException e) {
    +            fail(e.toString());
    +        }
    +    }
    +    
    +    public static class FailingRegionObserver extends SimpleRegionObserver 
{
    --- End diff --
    
    @jyates, @apurtell FYI, this is how I hooked up a coprocessor to simulate 
failure of specific rows.


> Partial results for mutations
> -----------------------------
>
>                 Key: PHOENIX-900
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-900
>             Project: Phoenix
>          Issue Type: Bug
>    Affects Versions: 3.0.0, 4.0.0
>            Reporter: Eli Levine
>            Assignee: Eli Levine
>         Attachments: PHOENIX-900.patch
>
>
> HBase provides a way to retrieve partial results of a batch operation: 
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/HTable.html#batch%28java.util.List,%20java.lang.Object[]%29
> Chatted with James about this offline:
> Yes, this could be included in the CommitException we throw 
> (MutationState:412). We already include the batches that have been 
> successfully committed to the HBase server in this exception. Would you be up 
> for adding this additional information? You'd want to surface this in a 
> Phoenix-y way in a method on CommitException, something like this: ResultSet 
> getPartialCommits(). You can easily create an in memory ResultSet using 
> MaterializedResultIterator plus the PhoenixResultSet constructor that accepts 
> this (just create a new empty PhoenixStatement with the PhoenixConnection for 
> the other arg).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to