Author: brandonwilliams Date: Tue Apr 5 22:00:00 2011 New Revision: 1089261
URL: http://svn.apache.org/viewvc?rev=1089261&view=rev Log: Optimize schema fetch/store. Patch by Jeremy Hanna, reviewed by brandonwilliams for CASSANDRA-2421 Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1089261&r1=1089260&r2=1089261&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Tue Apr 5 22:00:00 2011 @@ -168,7 +168,7 @@ public class CassandraStorage extends Lo private CfDef getCfDef() { UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(ResourceSchema.class); + Properties property = context.getUDFProperties(CassandraStorage.class); return cfdefFromString(property.getProperty(UDFCONTEXT_SCHEMA_KEY)); } @@ -314,6 +314,7 @@ public class CassandraStorage extends Lo setLocationFromUri(location); ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); + initSchema(); } public OutputFormat getOutputFormat() @@ -443,41 +444,46 @@ public class CassandraStorage extends Lo private void initSchema() { - Cassandra.Client client = null; - try + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(CassandraStorage.class); + + // Only get the schema if we haven't already gotten it + if (!property.containsKey(UDFCONTEXT_SCHEMA_KEY)) { - client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true); - CfDef cfDef = null; - client.set_keyspace(keyspace); - KsDef ksDef = client.describe_keyspace(keyspace); - List<CfDef> defs = ksDef.getCf_defs(); - for (CfDef def : defs) + Cassandra.Client client = null; + try { - if (column_family.equalsIgnoreCase(def.getName())) + client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true); + CfDef cfDef = null; + client.set_keyspace(keyspace); + KsDef ksDef = client.describe_keyspace(keyspace); + List<CfDef> defs = ksDef.getCf_defs(); + for (CfDef def : defs) { - cfDef = def; - break; + if (column_family.equalsIgnoreCase(def.getName())) + { + cfDef = def; + break; + } } + property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef)); + } + catch (TException e) + { + throw new RuntimeException(e); + } + catch (InvalidRequestException e) + { + throw new RuntimeException(e); + } + catch (NotFoundException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new RuntimeException(e); } - UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(ResourceSchema.class); - property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef)); - } - catch (TException e) - { - throw new RuntimeException(e); - } - catch (InvalidRequestException e) - { - throw new RuntimeException(e); - } - catch (NotFoundException e) - { - throw new RuntimeException(e); - } - catch (IOException e) - { - throw new RuntimeException(e); } }
