http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/test/java/SampleData.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/test/java/SampleData.java b/partition/common-query/src/test/java/SampleData.java new file mode 100644 index 0000000..071076b --- /dev/null +++ b/partition/common-query/src/test/java/SampleData.java @@ -0,0 +1,228 @@ +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.io.Text; + +import cloudbase.core.client.BatchWriter; +import cloudbase.core.client.CBException; +import cloudbase.core.client.CBSecurityException; +import cloudbase.core.client.Connector; +import cloudbase.core.client.Instance; +import cloudbase.core.client.MultiTableBatchWriter; +import cloudbase.core.client.TableExistsException; +import cloudbase.core.client.TableNotFoundException; +import cloudbase.core.client.mock.MockInstance; +import cloudbase.core.data.Mutation; +import cloudbase.core.security.Authorizations; + + +public class SampleData { + public static int NUM_PARTITIONS = 2; + public static int NUM_SAMPLES = 10; + + public static Connector initConnector() { + Instance instance = new MockInstance(); + + try { + Connector connector = instance.getConnector("root", "password".getBytes()); + + // set up table + connector.tableOperations().create("partition"); + connector.tableOperations().create("provenance"); + + // set up root's auths + connector.securityOperations().changeUserAuthorizations("root", new Authorizations("ALPHA,BETA,GAMMA".split(","))); + + return connector; + } catch (CBException e) { + e.printStackTrace(); + } catch (CBSecurityException e) { + e.printStackTrace(); + } catch (TableExistsException e) { + e.printStackTrace(); + } + + return null; + } + + public static Collection<Map<String, String>> sampleData() { + List<Map<String, String>> list = new ArrayList<Map<String, String>>(); + Map<String, String> item; + + for (int i = 0; i < NUM_SAMPLES; i++) { + item = new HashMap<String, String>(); + for (int j = 0; j < 5; j++) { + item.put("field" + j , new String(new char[] {(char) ('A' + ((j + i) % 26))})); + } + list.add(item); + } + return list; + } + + public static void writeDenCellLevel(Connector connector, Collection<Map<String, String>> data) { + // write sample data + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(200000, 10000, 1); + try { + BatchWriter writer; + if (mtbw != null) { + writer = mtbw.getBatchWriter("partition"); + } else { + writer = connector.createBatchWriter("partition", 200000, 10000, 1); + } + int count = 0; + Mutation m; + for (Map<String, String> object: data) { + count++; + String id = (count < 10 ? "0" + count: "" + count); + Text partition = new Text("" + (count % NUM_PARTITIONS)); + + // write dummy record + m = new Mutation(partition); + m.put("event", id, ""); + writer.addMutation(m); + + for (Entry<String, String> entry: object.entrySet()) { + // write the event mutation + m = new Mutation(partition); + m.put("event", id + "\u0000" + entry.getKey(), entry.getValue()); + writer.addMutation(m); + + // write the general index mutation + m = new Mutation(partition); + m.put("index", entry.getValue() + "\u0000" + id, ""); + writer.addMutation(m); + + // write the specific index mutation + m = new Mutation(partition); + m.put("index", entry.getKey() + "//" + entry.getValue() + "\u0000" + id, ""); + writer.addMutation(m); + } + } + writer.close(); + } catch (CBException e) { + e.printStackTrace(); + } catch (CBSecurityException e) { + e.printStackTrace(); + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + } + + public static void writeDenSerialized(Connector connector, Collection<Map<String, String>> data) { + // write sample data + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(200000, 10000, 1); + try { + BatchWriter writer; + if (mtbw != null) { + writer = mtbw.getBatchWriter("partition"); + } else { + writer = connector.createBatchWriter("partition", 200000, 10000, 1); + } + int count = 0; + Mutation m; + for (Map<String, String> object: data) { + count++; + String id = (count < 10 ? "0" + count: "" + count); + Text partition = new Text("" + (count % NUM_PARTITIONS)); + + StringBuilder value = new StringBuilder(); + boolean first = true; + for (Entry<String, String> entry: object.entrySet()) { + if (!first) { + value.append("\u0000"); + } else { + first = false; + } + value.append(entry.getKey()); + value.append("\uFFFD"); + value.append(entry.getValue()); + + // write the general index mutation + m = new Mutation(partition); + m.put("index", entry.getValue() + "\u0000" + id, ""); + writer.addMutation(m); + + // write the specific index mutation + m = new Mutation(partition); + m.put("index", entry.getKey() + "//" + entry.getValue() + "\u0000" + id, ""); + writer.addMutation(m); + } + + // write the event mutation + m = new Mutation(partition); + m.put("event", id, value.toString()); + writer.addMutation(m); + } + writer.close(); + } catch (CBException e) { + e.printStackTrace(); + } catch (CBSecurityException e) { + e.printStackTrace(); + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + } + + public static void writeDenProvenance(Connector connector) { + // write sample data + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(200000, 10000, 1); + try { + BatchWriter writer; + if (mtbw != null) { + writer = mtbw.getBatchWriter("provenance"); + } else { + writer = connector.createBatchWriter("provenance", 200000, 10000, 1); + } + Mutation m; + for (int sid = 1; sid <= 2; sid++) { + for (int time = 1; time <= 3; time++) { + for (int uuid = 1; uuid <= (6 + 2 * time); uuid++) { + m = new Mutation(new Text("sid" + sid)); + m.put("time" + time, "uuid-" + Integer.toHexString(uuid), ""); + writer.addMutation(m); + } + } + } + writer.close(); + } catch (CBException e) { + e.printStackTrace(); + } catch (CBSecurityException e) { + e.printStackTrace(); + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + } + + public static void writeMinIndexes(Connector connector) { + // write sample data + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(200000, 10000, 1); + try { + BatchWriter writer; + if (mtbw != null) { + writer = mtbw.getBatchWriter("partition"); + } else { + writer = connector.createBatchWriter("partition", 200000, 10000, 1); + } + Mutation m; + for (int i = 1; i <= NUM_SAMPLES; i++) { + m = new Mutation(new Text("" + (i % NUM_PARTITIONS))); + + String id = (i < 10 ? "0" + i: "" + i); + + m.put("index", "z_" + id + "_rdate\u0000" + id, ""); + writer.addMutation(m); + } + writer.close(); + } catch (CBException e) { + e.printStackTrace(); + } catch (CBSecurityException e) { + e.printStackTrace(); + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/test/java/SampleGVData.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/test/java/SampleGVData.java b/partition/common-query/src/test/java/SampleGVData.java new file mode 100644 index 0000000..d8168de --- /dev/null +++ b/partition/common-query/src/test/java/SampleGVData.java @@ -0,0 +1,182 @@ + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.io.Text; + +import cloudbase.core.client.BatchWriter; +import cloudbase.core.client.CBException; +import cloudbase.core.client.CBSecurityException; +import cloudbase.core.client.Connector; +import cloudbase.core.client.Instance; +import cloudbase.core.client.MultiTableBatchWriter; +import cloudbase.core.client.TableExistsException; +import cloudbase.core.client.TableNotFoundException; +import cloudbase.core.client.mock.MockInstance; +import cloudbase.core.data.Mutation; +import cloudbase.core.security.Authorizations; + +// For use in testing the Date Filter and Frequency Filter classes +public class SampleGVData +{ + + public static int NUM_PARTITIONS = 2; + + + public static Connector initConnector() + { + Instance instance = new MockInstance(); + + try + { + Connector connector = instance.getConnector("root", "password".getBytes()); + + // set up table + connector.tableOperations().create("partition"); + + // set up root's auths + connector.securityOperations().changeUserAuthorizations("root", new Authorizations("ALPHA,BETA,GAMMA".split(","))); + + return connector; + } + catch (CBException e) + { + e.printStackTrace(); + } + catch (CBSecurityException e) + { + e.printStackTrace(); + } + catch (TableExistsException e) + { + e.printStackTrace(); + } + + return null; + } + + public static Collection<Map<String, String>> sampleData() + { + List<Map<String, String>> list = new ArrayList<Map<String, String>>(); + Map<String, String> item; + + item = new HashMap<String, String>(); + item.put("a", "a"); + item.put("b", "b"); + + //This one is like RB + item.put("date-start", "2009-01-01"); + item.put("date-end", "2011-02-24"); + item.put("date-update", "2011-02-24T00:00:00Z"); + item.put("frequency", "1250000000"); + item.put("bandwidth", "500000000"); + item.put("version", "1"); + list.add(item); + + item = new HashMap<String, String>(); + item.put("a", "a"); + item.put("b", "b"); + list.add(item); + + //This one is like GV + item = new HashMap<String, String>(); + item.put("a", "a"); + item.put("b", "b"); + item.put("date-start", "2010-01-01"); + item.put("date-update", "2010-01-23"); + item.put("frequency", "1150000000"); + item.put("bandwidth", "300000000"); + list.add(item); + + item = new HashMap<String, String>(); + item.put("a", "a"); + item.put("b", "b"); + item.put("date-start", "2009-01-01"); + item.put("date-end", "2011-02-24"); + item.put("date-update", "2008-01-23"); + list.add(item); + + item = new HashMap<String, String>(); + item.put("a", "a"); + item.put("b", "b"); + list.add(item); + + return list; + } + + + public static void writeDenSerialized(Connector connector, Collection<Map<String, String>> data) + { + // write sample data + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(200000, 10000, 1); + try + { + BatchWriter writer; + if (mtbw != null) + { + writer = mtbw.getBatchWriter("partition"); + } + else + { + writer = connector.createBatchWriter("partition", 200000, 10000, 1); + } + int count = 0; + Mutation m; + for (Map<String, String> object : data) + { + count++; + String id = (count < 10 ? "0" + count : "" + count); + Text partition = new Text("" + (count % NUM_PARTITIONS)); + + StringBuilder value = new StringBuilder(); + boolean first = true; + for (Entry<String, String> entry : object.entrySet()) + { + if (!first) + { + value.append("\u0000"); + } + else + { + first = false; + } + value.append(entry.getKey()); + value.append("\uFFFD"); + value.append(entry.getValue()); + + // write the general index mutation + m = new Mutation(partition); + m.put("index", entry.getValue() + "\u0000" + id, ""); + writer.addMutation(m); + + // write the specific index mutation + m = new Mutation(partition); + m.put("index", entry.getKey() + "//" + entry.getValue() + "\u0000" + id, ""); + writer.addMutation(m); + } + + // write the event mutation + m = new Mutation(partition); + m.put("event", id, value.toString()); + writer.addMutation(m); + } + writer.close(); + } + catch (CBException e) + { + e.printStackTrace(); + } + catch (CBSecurityException e) + { + e.printStackTrace(); + } + catch (TableNotFoundException e) + { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/test/java/SampleJTSData.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/test/java/SampleJTSData.java b/partition/common-query/src/test/java/SampleJTSData.java new file mode 100644 index 0000000..41df658 --- /dev/null +++ b/partition/common-query/src/test/java/SampleJTSData.java @@ -0,0 +1,171 @@ + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.io.Text; + +import cloudbase.core.client.BatchWriter; +import cloudbase.core.client.CBException; +import cloudbase.core.client.CBSecurityException; +import cloudbase.core.client.Connector; +import cloudbase.core.client.Instance; +import cloudbase.core.client.MultiTableBatchWriter; +import cloudbase.core.client.TableExistsException; +import cloudbase.core.client.TableNotFoundException; +import cloudbase.core.client.mock.MockInstance; +import cloudbase.core.data.Mutation; +import cloudbase.core.security.Authorizations; + +// For use in testing the Date Filter and Frequency Filter classes +public class SampleJTSData +{ + + public static int NUM_PARTITIONS = 2; + + + public static Connector initConnector() + { + Instance instance = new MockInstance(); + + try + { + Connector connector = instance.getConnector("root", "password".getBytes()); + + // set up table + connector.tableOperations().create("partition"); + + // set up root's auths + connector.securityOperations().changeUserAuthorizations("root", new Authorizations("ALPHA,BETA,GAMMA".split(","))); + + return connector; + } + catch (CBException e) + { + e.printStackTrace(); + } + catch (CBSecurityException e) + { + e.printStackTrace(); + } + catch (TableExistsException e) + { + e.printStackTrace(); + } + + return null; + } + + public static Collection<Map<String, String>> sampleData() + { + List<Map<String, String>> list = new ArrayList<Map<String, String>>(); + Map<String, String> item; + + item = new HashMap<String, String>(); + item.put("geometry-contour", "SDO_GEOMETRY(2007, 8307, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 1), SDO_ORDINATE_ARRAY(91.985, -12.108, 94.657, -12.059, 98.486, -11.988, 101.385, -12.296, 102.911, -12.569, 103.93, -12.852, 105.005, -12.531, 106.37, -12.204, 108.446, -11.503, 109.585, -10.88, 110.144, -10.207, 108.609, -9.573, 106.05, -8.535, 104.145, -7.606, 102.191, -7.522, 99.522, -7.691, 97.64, -7.606, 95.482, -7.947, 94.546, -8.084, 92.465, -8.605, 90.554, -9.366, 90.197, -10.436, 89.84, -11.729, 90.554, -12.175, 91.985, -12.108))"); + item.put("beam-name", "OPTUS D1 Ku-BAND NATIONAL A & B AUSTRALIA Downlink"); + list.add(item); + //This is Australia + //Points like 22S 135E are in the beam + + //This one is like GV + item = new HashMap<String, String>(); + item.put("beam-name", "AMC 1 Ku-BAND ZONAL NORTH AMERICA Down HV"); + item.put("geometry-contour", "SDO_GEOMETRY(2007, 8307, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 1), SDO_ORDINATE_ARRAY(-70.838, 39.967, -70.506, 40.331, -70.698, 41.679, -71.179, 42.401, -71.578, 42.38, -72.994, 42.924, -74.353, 43.242, -75.715, 43.26, -77.318, 42.981, -78.684, 42.774, -80.05, 42.491, -82.005, 42.517, -83.608, 42.312, -84.977, 41.805, -86.58, 41.525, -88.127, 41.02, -89.731, 40.741, -90.905, 41.582, -92.264, 41.9, -93.861, 42.147, -95.411, 41.341, -96.257, 40.076, -97.222, 38.737, -98.011, 37.17, -98.031, 35.593, -97.691, 34.312, -96.875, 33.25, -97.307, 31.904, -97.916, 30.561, -98.702, 29.295, -99.134, 27.949, -98.14, 26.884, -97.205, 25.821, -95.842, 25.803, -94.42, 25.784, -92.876, 26.064, -91.277, 26.043, -90.085, 26.553, -88.729, 26.01, -87.38, 24.941, -86.031, 23.797, -84.616, 23.253, -83.256, 23.01, -81.887, 23.517, -80.866, 24.555, -80.254, 26.124, -79.642, 27.693, -78.444, 28.728, -77.486, 29.542, -76.463, 30.805, -76.088, 32.377, -75.656, 33.723, -76.051, 35.305, -75.442, 36.649, -74.426, 37.386, -73.228, 38.422, -72.032, 39.232, -70.838, 39.967))"); + list.add(item); + //This is North America + //Points 39°44'21.00"N 104°59'3.00"W (Denver) are in the footprint + + item = new HashMap<String, String>(); + item.put("beam-name", "testa"); + item.put("beam-footprint", "MULTIPOLYGON (((-169.286 40.431, -164.971 39.992, -155.397 38.482, -146.566 36.233, -136.975 32.539, -128.124 27.742, -121.946 24.548, -116.849 21.339, -112.156 17.479, -109.391 14.206, -107.301 11.715, -105.274 9.477, -103.443 8.229, -102.108 7.7, -99.109 7.428, -96.681 7.745, -93.894 8.843, -89.917 11.687, -85.953 15.017, -81.148 17.266, -78.145 17.986, -75.582 17.887, -68.1 17.987, -64.696 18.493, -61.445 19.38, -60.094 20.288, -59.315 21.564, -57.026 26.51, -55.089 30.962, -53.59 33.657, -52.495 34.691, -50.468 36.204, -46.146 38.672, -41.684 40.663, -37.914 42.055, -33.806 43.082, -27.523 44.149, -21.645 44.96, -16.578 45.406, -13.807 45.771, -14.929 50.108, -16.186 53.919, -17.051 56.0, -18.388 58.824, -19.861 61.567, -21.807 64.188, -23.104 65.742, -25.28 67.904, -27.699 69.823, -28.955 70.728, -32.415 72.768, -34.968 73.998, -38.468 75.309, -48.292 73.025, -56.545 71.12, -64.023 70.474, -72.753 70.357, -78.41 70.827, -80.466 71.093, -82.412 71.876, -83.02 72.944, -83.175 74.04, -82.493 74.782, -82.412 75.552, -82.697 76.778, -84.041 78.398, -86.316 81.078, -104.098 80.819, -110.861 80.482, -115.73 80.17, -120.936 79.669, -125.84 79.176, -126.696 79.02, -134.316 77.732, -139.505 76.478, -144.823 74.826, -148.231 73.417, -151.517 71.687, -153.87 70.165, -154.536 69.672, -155.868 68.678, -156.482 68.098, -158.281 66.421, -159.716 64.804, -160.996 63.126, -161.878 61.786, -163.046 59.875, -164.369 57.254, -165.563 54.479, -166.73 51.089, -167.811 47.267, -168.581 44.041, -169.286 40.431)), ((-171.333 23.244, -171.523 18.894, -170.127 18.986, -161.559 18.555, -156.977 18.134, -153.574 18.116, -151.108 18.324, -149.947 18.45, -149.018 18.957, -148.515 19.822, -148.524 20.914, -149.018 21.766, -149.947 22.272, -152.185 23.054, -155.563 23.434, -158.075 23.75, -160.272 24.034, -162.184 24.008, -163.514 23.99, -164.595 23.976, -166.52 23.687, -169.159 23.18, -171.333 23.244)))"); + list.add(item); +// this point should be in there... + // -164 40 - somewhere near hawaii + + item = new HashMap<String, String>(); + item.put("beam-name", "testb"); + item.put("beam-footprint", "POLYGON ((-140.153 34.772, -140.341 33.272, -137.024 33.026, -132.723 32.369, -130.947 31.916, -128.664 31.225, -125.293 29.612, -121.813 27.871, -118.699 25.892, -115.589 23.79, -112.593 21.875, -109.136 19.335, -106.939 16.701, -105.006 14.97, -104.195 14.407, -103.049 13.659, -100.363 12.717, -98.063 12.288, -94.299 11.612, -90.825 11.097, -87.997 11.584, -86.815 12.109, -86.163 12.893, -85.014 14.342, -83.804 15.788, -82.104 16.998, -80.413 17.269, -78.005 16.574, -76.181 16.531, -74.65 16.68, -73.552 17.392, -72.957 18.3, -72.917 19.651, -73.526 21.325, -74.913 23.018, -76.036 24.519, -76.159 26.428, -75.741 28.447, -74.257 30.072, -72.771 31.331, -70.517 34.328, -69.638 36.04, -68.624 39.467, -68.015 41.851, -67.607 43.501, -67.548 45.528, -67.586 47.308, -68.601 49.066, -69.868 50.07, -71.621 50.778, -73.285 50.888, -74.9 50.926, -76.994 50.975, -79.332 50.846, -81.066 50.887, -83.842 51.136, -86.569 51.016, -87.95 50.864, -90.831 50.563, -94 .27 50.644, -98.068 50.733, -102.937 51.032, -106.455 51.484, -109.973 51.936, -114.119 52.402, -117.363 53.031, -119.899 53.276, -123.243 53.539, -127.017 54.427, -130.519 55.431, -133.643 56.058, -134.826 56.279, -135.354 55.029, -135.792 53.864, -136.168965072136 52.8279962761917, -136.169 52.828, -136.169497186166 52.8264970826432, -136.192 52.763, -136.556548517884 51.6453176911637, -136.703232746756 51.2152965828266, -136.781220290925 50.9919311116929, -136.793 50.959, -136.80274055379 50.9259886895048, -136.992 50.295, -137.200898649547 49.5808675274021, -137.202 49.581, -137.200962495599 49.5806459535167, -137.360714473458 49.0197683891632, -137.459 48.677, -137.462166719028 48.6649126473121, -137.471 48.634, -137.515105536699 48.4619710228524, -137.74710368039 47.5528216167105, -137.793718522461 47.3758260237407, -137.854 47.152, -137.977773277882 46.6610808974241, -138.044 46.403, -138.330834102374 45.1674736036557, -138.365 45.019, -138.38180854655 44.9421315900087, -138. 449801069917 44.6389849661384, -138.485 44.484, -138.497077239724 44.4262941289417, -138.536 44.25, -138.622787032392 43.8206200438395, -138.743816168807 43.232032787661, -138.981390224617 42.0843314825185, -138.989 42.048, -138.990605533614 42.0389442888447, -138.991 42.037, -138.997785044232 41.9994454595406, -139.004 41.969, -139.035645873997 41.7890661698517, -139.061212567475 41.6462082823816, -139.428 39.584, -139.673 38.073, -139.713116752585 37.8001474769807, -139.766 37.457, -139.764942047737 37.4567768906428, -139.898 36.573, -139.897723683259 36.5729429963606, -139.986 35.994, -140.04777653037 35.5462970502163, -140.094 35.232, -140.090797568766 35.2315144621917, -140.153 34.772))"); + list.add(item); + + + + //London is in niether - 51°30'0.00"N 0° 7'0.00"W + return list; + } + + + public static void writeDenSerialized(Connector connector, Collection<Map<String, String>> data) + { + // write sample data + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(200000, 10000, 1); + try + { + BatchWriter writer; + if (mtbw != null) + { + writer = mtbw.getBatchWriter("partition"); + } + else + { + writer = connector.createBatchWriter("partition", 200000, 10000, 1); + } + int count = 0; + Mutation m; + for (Map<String, String> object : data) + { + count++; + String id = (count < 10 ? "0" + count : "" + count); + Text partition = new Text("" + (count % NUM_PARTITIONS)); + + StringBuilder value = new StringBuilder(); + boolean first = true; + for (Entry<String, String> entry : object.entrySet()) + { + if (!first) + { + value.append("\u0000"); + } + else + { + first = false; + } + value.append(entry.getKey()); + value.append("\uFFFD"); + value.append(entry.getValue()); + + // write the general index mutation + m = new Mutation(partition); + m.put("index", entry.getValue() + "\u0000" + id, ""); + writer.addMutation(m); + + // write the specific index mutation + m = new Mutation(partition); + m.put("index", entry.getKey() + "//" + entry.getValue() + "\u0000" + id, ""); + writer.addMutation(m); + } + + // write the event mutation + m = new Mutation(partition); + m.put("event", id, value.toString()); + writer.addMutation(m); + } + writer.close(); + } + catch (CBException e) + { + e.printStackTrace(); + } + catch (CBSecurityException e) + { + e.printStackTrace(); + } + catch (TableNotFoundException e) + { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/iterator-test/filter.txt ---------------------------------------------------------------------- diff --git a/partition/iterator-test/filter.txt b/partition/iterator-test/filter.txt new file mode 100644 index 0000000..b53773f --- /dev/null +++ b/partition/iterator-test/filter.txt @@ -0,0 +1,6 @@ +<BBOX> + <gml:Envelope> + <gml:LowerCorner>119 33</gml:LowerCorner> + <gml:UpperCorner>120 34</gml:UpperCorner> + </gml:Envelope> +</BBOX> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/iterator-test/pom.xml ---------------------------------------------------------------------- diff --git a/partition/iterator-test/pom.xml b/partition/iterator-test/pom.xml new file mode 100644 index 0000000..daed27f --- /dev/null +++ b/partition/iterator-test/pom.xml @@ -0,0 +1,99 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <artifactId>dss</artifactId> + <groupId>dss</groupId> + <version>2.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <groupId>dss.webservice</groupId> + <artifactId>iterator-test</artifactId> + <packaging>jar</packaging> + <name>webservice-test</name> + <version>0.2.0-SNAPSHOT</version> + <description /> + + <properties> + <env>USER</env> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>dss.webservice.itr.Main</mainClass> + </manifest> + </archive> + </configuration> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>unpack-dependencies</id> + <phase>generate-resources</phase> + <goals> + <goal>unpack-dependencies</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + <resources> + <resource> + <directory>${basedir}/target/dependency</directory> + </resource> + </resources> + </build> + <dependencies> + <dependency> + <groupId>cloudbase</groupId> + <artifactId>cloudbase-core</artifactId> + <version>1.3.1</version> + </dependency> + <dependency> + <groupId>cloudbase</groupId> + <artifactId>cloudbase-start</artifactId> + <version>1.3.1</version> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>thrift</artifactId> + <version>0.3</version> + </dependency> + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + <scope>provided</scope> + <version>2.5</version> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + <version>1.0.4</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>0.20.1</version> + </dependency> + <dependency> + <groupId>org.apache</groupId> + <artifactId>zookeeper</artifactId> + <version>3.3.0</version> + </dependency> + <dependency> + <groupId>sitestore.common</groupId> + <artifactId>common-query</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/iterator-test/src/main/java/dss/webservice/itr/Main.java ---------------------------------------------------------------------- diff --git a/partition/iterator-test/src/main/java/dss/webservice/itr/Main.java b/partition/iterator-test/src/main/java/dss/webservice/itr/Main.java new file mode 100644 index 0000000..6b040fc --- /dev/null +++ b/partition/iterator-test/src/main/java/dss/webservice/itr/Main.java @@ -0,0 +1,348 @@ +package dss.webservice.itr; + +import java.io.File; +import java.util.HashSet; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; +import java.util.TreeMap; +import java.util.Map.Entry; + +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import ss.cloudbase.core.iterators.CellLevelFilteringIterator; +import ss.cloudbase.core.iterators.CellLevelRecordIterator; +import ss.cloudbase.core.iterators.ConversionIterator; +import ss.cloudbase.core.iterators.GMDenIntersectingIterator; +import ss.cloudbase.core.iterators.SortedMinIterator; +import ss.cloudbase.core.iterators.SortedRangeIterator; +import ss.cloudbase.core.iterators.filter.ogc.OGCFilter; +import cloudbase.core.client.BatchScanner; +import cloudbase.core.client.Connector; +import cloudbase.core.client.ZooKeeperInstance; +import cloudbase.core.data.Key; +import cloudbase.core.data.PartialKey; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.FilteringIterator; +import cloudbase.core.iterators.filter.RegExFilter; +import cloudbase.core.security.Authorizations; + +public class Main { + private static final Logger logger = Logger.getLogger(Main.class); + + private static String CB_INSTANCE = "INSTANCENAME"; // INSERT INSTANCE NAME + private static String ZK_SERVERS = "r02sv22:2181,r03sv23:2181,r04sv22:2181,r05sv23:2181"; + private static String CB_USER = "user"; // SET USERNAME + private static String CB_PASS = "pass"; // SET PASSWORD + private static String CB_AUTH = "U,FOUO"; + private static String CB_TABLE = "partition_gi"; + + public static void main(String[] args) { + Map<String,String> request = new TreeMap<String, String>(); + + int itrLevel = 50; + + for (String pair: args) { + String[] parts = pair.split("[=]"); + if (parts.length == 1) { + request.put(parts[0], parts[0]); + } else if (parts.length == 2) { + request.put(parts[0], parts[1]); + } + } + + BatchScanner reader = null; + + String filter = request.remove("filter"); + String terms = request.remove("terms"); + String ranges = request.remove("ranges"); + String partition = request.remove("partition"); + String rangeFamily = request.remove("rangeFamily"); + String prefix = request.remove("prefix"); + String index = request.remove("index"); + String test = request.remove("test"); + String testKey = request.remove("testKey"); + String convert = request.remove("convert"); + String grep = request.remove("grep"); + int print = -1; + + try { + print = Integer.parseInt(request.remove("print")); + } catch (NumberFormatException e) { + print = 0; + } + + boolean dryRun = request.remove("dryRun") != null; + boolean debug = request.remove("debug") != null; + boolean startInclusive = request.remove("start") != null; + boolean endInclusive = request.remove("end") != null; + boolean nodoc = request.remove("nodoc") != null; + boolean multiDoc = request.remove("multiDoc") != null; + boolean aggregate = request.remove("aggregate") != null; + + int threads = 5; + if (request.containsKey("threads")) { + threads = Integer.parseInt(request.remove("threads")); + } + + if (partition != null) { + partition = partition.replace(".", "\u0000"); + } + + if (index != null) { + index = index.replace(':', '='); + } + + if (testKey != null) { + testKey = testKey.replace(".", "\u0000"); + } + + if (request.containsKey("c")) { + CB_INSTANCE = request.remove("c"); + } + + if (request.containsKey("z")) { + ZK_SERVERS = request.remove("z"); + } + + if (request.containsKey("u")) { + CB_USER = request.remove("u"); + } + + if (request.containsKey("p")) { + CB_PASS = request.remove("p"); + } + + if (request.containsKey("s")) { + CB_AUTH = request.remove("s"); + } + + if (request.containsKey("t")) { + CB_TABLE = request.remove("t"); + } + + logger.info("Cloudbase Connection: "); + logger.info("\tc (instance):\t" + CB_INSTANCE); + logger.info("\tz (zk servers):\t" + ZK_SERVERS); + logger.info("\tu (user):\t" + CB_USER); + logger.info("\tp (pass):\t" + CB_PASS); + logger.info("\ts (auths):\t" + CB_AUTH); + logger.info("\tt (table):\t" + CB_TABLE); + + logger.info("Query Parameters:"); + logger.info("\tindex:\t\t" + index); + logger.info("\tfilter:\t\t" + filter); + logger.info("\tterms:\t\t" + terms); + logger.info("\tgrep:\t\t" + grep); + logger.info("\tprefix:\t\t" + prefix); + logger.info("\tranges:\t\t" + ranges); + logger.info("\trangeFamily:\t" + rangeFamily); + logger.info("\tpartition:\t" + partition); + logger.info("\tstartInc:\t" + startInclusive); + logger.info("\tendInc:\t\t" + endInclusive); + logger.info("\tthreads:\t" + threads); + logger.info("\tprint:\t\t" + print); + logger.info("\tdryRun:\t\t" + dryRun); + logger.info("\tdebug:\t\t" + debug); + logger.info("\ttestKey:\t" + testKey); + logger.info("\tmultiDoc:\t" + multiDoc); + logger.info("\taggregate:\t" + aggregate); + logger.info("\tconvert:\t" + convert); + + logger.info("Unknown Parameters: "); + for (Entry<String,String> entry: request.entrySet()) { + logger.info("\t" + entry.getKey() + ":\t\t" + entry.getValue()); + } + + if (debug) { + // set the cloudbase logging to trace + Logger.getLogger("cloudbase").setLevel(Level.TRACE); + } + + boolean iteratorSet = false; + + try { + ZooKeeperInstance zk = new ZooKeeperInstance(CB_INSTANCE, ZK_SERVERS); + Connector connector = new Connector(zk, CB_USER, CB_PASS.getBytes()); + if (test != null) { + Test t = (Test) Class.forName("dss.webservice.itr.test." + test).newInstance(); + t.runTest(request, connector, CB_TABLE, CB_AUTH); + logger.info("done."); + System.exit(0); + } + reader = connector.createBatchScanner(CB_TABLE, new Authorizations(CB_AUTH.split(",")), threads); + + Set<Range> partitionRanges = new HashSet<Range>(); + if (partition != null) { + partition = partition.replace(".", "\u0000"); + Key startKey = null; + Key endKey = null; + if (partition.contains(",")) { + startKey = new Key(new Text(partition.split(",")[0])); + endKey = new Key(new Text(partition.split(",")[1])); + } else { + startKey = new Key(new Text(partition)); + endKey = startKey.followingKey(PartialKey.ROW); + } + + Range range = new Range(startKey, true, endKey, false); + if (testKey != null) { + Key kTest = new Key(new Text(testKey)); + if (range.contains(kTest)) { + logger.info("Key " + kTest + " is in the current range"); + } else { + logger.info("Key " + kTest + " is not in the current range"); + } + } + partitionRanges.add(range); + } else { + partitionRanges.add(new Range()); + } + + if (terms != null && terms.trim().length() > 0) { + String[] parts = terms.trim().split(","); + if (parts.length == 1) { + logger.info("Creating range iterator from '" + parts[0] + "' to '" + parts[0] + "\\u0000'."); + reader.setScanIterators(itrLevel++, SortedRangeIterator.class.getName(), "ri"); + reader.setScanIteratorOption("ri", SortedRangeIterator.OPTION_DOC_COLF, "event"); + reader.setScanIteratorOption("ri", SortedRangeIterator.OPTION_COLF, "index"); + reader.setScanIteratorOption("ri", SortedRangeIterator.OPTION_LOWER_BOUND, parts[0]); + reader.setScanIteratorOption("ri", SortedRangeIterator.OPTION_UPPER_BOUND, parts[0] + "\u0000"); + reader.setScanIteratorOption("ri", SortedRangeIterator.OPTION_MULTI_DOC, "" + multiDoc); + iteratorSet = true; + } else if (parts.length > 1) { + logger.info("Creating intersecting iterator from all terms"); + Text[] t = new Text[parts.length]; + for (int i = 0; i < parts.length; i++) { + if (parts[i].startsWith("range")) { + parts[i] = parts[i].replace("_", "\u0000"); + } + + t[i] = new Text(parts[i]); + logger.info("Adding Term: " + parts[i]); + } + + reader.setScanIterators(itrLevel++, GMDenIntersectingIterator.class.getName(), "ii"); + reader.setScanIteratorOption("ii", GMDenIntersectingIterator.docFamilyOptionName, "event"); + reader.setScanIteratorOption("ii", GMDenIntersectingIterator.indexFamilyOptionName, "index"); + reader.setScanIteratorOption("ii", GMDenIntersectingIterator.columnFamiliesOptionName, GMDenIntersectingIterator.encodeColumns(t)); + reader.setScanIteratorOption("ii", GMDenIntersectingIterator.OPTION_MULTI_DOC, "" + multiDoc); + iteratorSet = true; + } + } else if (ranges != null && ranges.trim().length() > 0) { + // set up a range iterator + logger.info("Creating range iterator on " + (rangeFamily != null ? rangeFamily: "index") + " for all ranges startInclusive: " + startInclusive + " endInclusive: " + endInclusive); + String[] parts = ranges.trim().split(","); + if (parts.length > 1 && parts.length % 2 == 0) { +// reader.setScanIterators(itrLevel++, RangeIterator.class.getName(), "ri"); +// reader.setScanIteratorOption("ri", RangeIterator.OPTION_INDEX_COLF, rangeFamily != null ? rangeFamily: "index"); +// reader.setScanIteratorOption("ri", RangeIterator.OPTION_START_INCLUSIVE, "" + startInclusive); +// reader.setScanIteratorOption("ri", RangeIterator.OPTION_END_INCLUSIVE, "" + endInclusive); +// reader.setScanIteratorOption("ri", RangeIterator.OPTION_RANGES, RangeIterator.encodeRanges(parts)); + + reader.setScanIterators(itrLevel++, SortedRangeIterator.class.getName(), "ir"); + reader.setScanIteratorOption("ir", SortedRangeIterator.OPTION_COLF, rangeFamily != null ? rangeFamily: "index"); + reader.setScanIteratorOption("ir", SortedRangeIterator.OPTION_START_INCLUSIVE, "" + startInclusive); + reader.setScanIteratorOption("ir", SortedRangeIterator.OPTION_END_INCLUSIVE, "" + endInclusive); + reader.setScanIteratorOption("ir", SortedRangeIterator.OPTION_LOWER_BOUND, parts[0]); + reader.setScanIteratorOption("ir", SortedRangeIterator.OPTION_UPPER_BOUND, parts[1]); + reader.setScanIteratorOption("ir", SortedRangeIterator.OPTION_MULTI_DOC, "" + multiDoc); + iteratorSet = true; + } else { + throw new RuntimeException("A start and end range must be given for each range"); + } + } else if (index != null && index.trim().length() > 0 && partition != null) { + // look for an index on a partition + + // get out the ranges and add the index colf and term colq + Range r = partitionRanges.iterator().next(); + Key start = new Key (r.getStartKey().getRow(), new Text("index"), new Text(index)); + Key end = new Key (r.getStartKey().getRow(), new Text("index"), new Text(index + "\uFFFD")); + partitionRanges.clear(); + partitionRanges.add(new Range(start, true, end, false)); + iteratorSet = true; + + } else if (prefix != null && prefix.trim().length() > 0) { + logger.info("Setting a min iterator on " + prefix); + reader.setScanIterators(itrLevel++, SortedMinIterator.class.getName(), "mi"); + reader.setScanIteratorOption("mi", SortedMinIterator.OPTION_PREFIX, prefix); + reader.setScanIteratorOption("mi", SortedMinIterator.OPTION_MULTI_DOC, "" + multiDoc); + iteratorSet = true; + } + + if (aggregate) { + reader.setScanIterators(itrLevel++, CellLevelRecordIterator.class.getName(), "aggregator"); + } + + if (filter != null && filter.trim().length() > 0) { + logger.info("Creating filtering iterator from filter in " + filter); + Scanner scanner = new Scanner(new File(filter)); + + filter = ""; + while (scanner.hasNextLine()) { + filter += scanner.nextLine().trim(); + } + + // set up a filtering iterator + logger.info("Filer = " + filter); + + if (multiDoc && !aggregate) { + reader.setScanIterators(itrLevel++, CellLevelFilteringIterator.class.getName(), "fi"); + reader.setScanIteratorOption("fi", CellLevelFilteringIterator.OPTION_FILTER, filter); + } else { + reader.setScanIterators(itrLevel++, FilteringIterator.class.getName(), "fi"); + reader.setScanIteratorOption("fi", "0", OGCFilter.class.getName()); + reader.setScanIteratorOption("fi", "0." + OGCFilter.OPTION_FILTER, filter); +// reader.setScanIteratorOption("fi", "1", RegExFilter.class.getName()); +// reader.setScanIteratorOption("fi", "1." + RegExFilter.ROW_REGEX, "theRegex"); + } + iteratorSet = true; + } + + if (convert != null && convert.trim().length() > 0) { + convert = convert.replaceAll("_", " "); + String[] conversions = convert.split(","); + reader.setScanIterators(itrLevel++, ConversionIterator.class.getName(), "ci"); + reader.setScanIteratorOption("ci", ConversionIterator.OPTION_CONVERSIONS, ConversionIterator.encodeConversions(conversions)); + reader.setScanIteratorOption("ci", ConversionIterator.OPTION_MULTI_DOC, "" + (multiDoc && ! aggregate)); + } + + logger.info("Setting range to: " + partitionRanges.iterator().next()); + reader.setRanges(partitionRanges); + + if (!iteratorSet) { + reader.fetchColumnFamily(new Text("event")); + } + if (!dryRun) { + long start = System.currentTimeMillis(); + int count = 0; + String id = null; + for (Entry<Key, Value> entry: reader) { + count++; + if (print == -1 || count <= print) { + String text = entry.getKey() + "\t" + entry.getValue(); + + if ((grep != null && text.contains(grep)) || grep == null) { + logger.info(text); + } + } + } + reader.close(); + logger.info("Time: " + (System.currentTimeMillis() - start) + " ms"); + logger.info("Count: " + count); + } else if (!iteratorSet) { + logger.info("No iterator was set from the provided parameters (and I'm not doing a full table scan... so there)."); + } else { + logger.info("Dry run complete."); + } + logger.info("Done"); + System.exit(0); + } catch (Exception e) { + logger.error(e, e); + System.exit(1); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/iterator-test/src/main/java/dss/webservice/itr/Test.java ---------------------------------------------------------------------- diff --git a/partition/iterator-test/src/main/java/dss/webservice/itr/Test.java b/partition/iterator-test/src/main/java/dss/webservice/itr/Test.java new file mode 100644 index 0000000..0b036b7 --- /dev/null +++ b/partition/iterator-test/src/main/java/dss/webservice/itr/Test.java @@ -0,0 +1,9 @@ +package dss.webservice.itr; + +import java.util.Map; + +import cloudbase.core.client.Connector; + +public interface Test { + void runTest(Map<String, String> request, Connector connector, String table, String auths); +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/iterator-test/src/main/java/dss/webservice/itr/test/AddTestRecords.java ---------------------------------------------------------------------- diff --git a/partition/iterator-test/src/main/java/dss/webservice/itr/test/AddTestRecords.java b/partition/iterator-test/src/main/java/dss/webservice/itr/test/AddTestRecords.java new file mode 100644 index 0000000..2139528 --- /dev/null +++ b/partition/iterator-test/src/main/java/dss/webservice/itr/test/AddTestRecords.java @@ -0,0 +1,43 @@ +package dss.webservice.itr.test; + +import java.util.Map; + +import org.apache.hadoop.io.Text; + +import cloudbase.core.client.BatchWriter; +import cloudbase.core.client.CBException; +import cloudbase.core.client.CBSecurityException; +import cloudbase.core.client.Connector; +import cloudbase.core.client.MultiTableBatchWriter; +import cloudbase.core.client.TableNotFoundException; +import cloudbase.core.data.Mutation; +import cloudbase.core.data.Value; +import cloudbase.core.security.ColumnVisibility; +import dss.webservice.itr.Test; + +public class AddTestRecords implements Test { + + @Override + public void runTest(Map<String, String> request, Connector connector, String table, String auths) { + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(200000000, 500, 4); + try { + BatchWriter writer = mtbw.getBatchWriter(table); + Mutation m = new Mutation(new Text("elint//rdate:79899179//geokey:20223312022200")); + m.put(new Text("event"), new Text("02eacfa1-b548-11df-b72e-002219501672"), new ColumnVisibility(new Text("U&FOUO")), new Value("uuid~event\uFFFD02eacfa1-b548-11df-b72e-002219501672\u0000date\uFFFD20100820\u0000time~dss\uFFFD010226.000\u0000technology\uFFFDelint\u0000feedName\uFFFDParserBinarySpSigFlat\u0000systemName\uFFFDSP\u0000pddg\uFFFDBJ\u0000latitude\uFFFD46.79429069085071\u0000longitude\uFFFD9.852863417535763\u0000altitude\uFFFD1841.0\u0000geoerror~semimajor\uFFFD3709.1270902747297\u0000geoerror~semiminor\uFFFD1896.9438653491684\u0000geoerror~tilt\uFFFD68.68795738630202\u0000frequency\uFFFD\u0000cenot_elnot\uFFFD008LJ\u0000datetime\uFFFD2010-08-20T01:02:26.000Z".getBytes())); + + writer.addMutation(m); + mtbw.flush(); + mtbw.close(); + } catch (CBException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (CBSecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (TableNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/iterator-test/src/main/java/dss/webservice/itr/test/BaseTileTest.java ---------------------------------------------------------------------- diff --git a/partition/iterator-test/src/main/java/dss/webservice/itr/test/BaseTileTest.java b/partition/iterator-test/src/main/java/dss/webservice/itr/test/BaseTileTest.java new file mode 100644 index 0000000..a8f01a9 --- /dev/null +++ b/partition/iterator-test/src/main/java/dss/webservice/itr/test/BaseTileTest.java @@ -0,0 +1,132 @@ +package dss.webservice.itr.test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import cloudbase.core.client.BatchScanner; +import cloudbase.core.client.Connector; +import cloudbase.core.data.Key; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.security.Authorizations; +import dss.webservice.itr.Test; + +public class BaseTileTest implements Test { + private static final Logger logger = Logger.getLogger(BaseTileTest.class); + + String comboIndexTable = "index_v2"; + String type = "hpcp"; + + @Override + public void runTest(Map<String, String> request, Connector connector, String table, String auths) { + if (!request.containsKey("dates")) { + logger.warn("No 'dates' parameter supplied. e.g. dates=20100720,20100721..."); + return; + } + + if (request.containsKey("type")) { + type = request.get("type"); + } + + String[] dates = request.get("dates").split(","); + + List<Long> comboTimes = new ArrayList<Long>(); + List<Long> partTimes = new ArrayList<Long>(); + List<Long> comboCounts = new ArrayList<Long>(); + List<Long> partCounts = new ArrayList<Long>(); + List<String> errors = new ArrayList<String>(); + try { + for (String date: dates) { + long rdate = 99999999 - Long.parseLong(date); + for (int g = 0; g < 8; g++) { + String begin = type + "//rdate:" + rdate + "//geokey:" + g; + String end = type + "//rdate:" + rdate + "//geokey:" + (g+1); + long count = 0; + Set<Range> ranges = new HashSet<Range>(); + + logger.info("Running test for " + begin + " ..."); + // run combo index test + BatchScanner reader = connector.createBatchScanner(table, new Authorizations(auths.split(",")), 30); + ranges.add(new Range(new Key(new Text(begin)), true, new Key(new Text(end)), false)); + + reader.setRanges(ranges); + long start = System.currentTimeMillis(); + for (Entry<Key, Value> entry: reader) { + count++; + } + comboTimes.add(System.currentTimeMillis() - start); + comboCounts.add(count); + + logger.info("\tC count=" + count + " time=" + comboTimes.get(comboTimes.size() - 1) + " ms"); + + count = 0; + + // run partition index test +// reader = connector.createBatchScanner(table, new Authorizations(auths.split(",")), 30); +// +// reader.setScanIterators(3, SortedRangeIterator.class.getName(), "ri"); +// reader.setScanIteratorOption("ri", SortedRangeIterator.OPTION_LOWER_BOUND, begin.replace("geokey", "geoKey")); +// reader.setScanIteratorOption("ri", SortedRangeIterator.OPTION_UPPER_BOUND, end.replace("geokey", "geoKey")); +// +// ranges.clear(); +// ranges.add(new Range(new Key(new Text("date:" + date)), true, new Key(new Text("date:" + date + "z")), false)); +// reader.setRanges(ranges); +// +// start = System.currentTimeMillis(); +// for (Entry<Key, Value> entry: reader) { +// count++; +// } +// partTimes.add(System.currentTimeMillis() - start); +// partCounts.add(count); +// +// if (count != comboCounts.get(comboCounts.size() - 1)) { +// String msg = "Counts differed for " + begin + " C: " + comboCounts.get(comboCounts.size() - 1) + " P: " + count; +// logger.warn(msg); +// errors.add(msg); +// } +// logger.info("\tP count=" + count + " time=" + partTimes.get(partTimes.size() - 1) + " ms"); + } + } + + logger.info("********************* RESULTS *********************"); + logger.info("Tested all 0 level tiles on " + type + " for " + request.get("dates")); + //logger.info("This is a test of SortedRangeIterator performance"); + + double comboSum = 0, partSum = 0; + for (int i = 0; i < comboTimes.size(); i++) { + comboSum += comboTimes.get(i); + //partSum += partTimes.get(i); + } + + logger.info("Average C Time: " + (comboSum / comboTimes.size()) + " ms"); + //logger.info("Average P Time: " + (partSum / partTimes.size()) + " ms"); + + comboSum = 0; + partSum = 0; + + for (int i = 0; i < comboCounts.size(); i++) { + comboSum += comboCounts.get(i); + //partSum += partCounts.get(i); + } + + logger.info("Average C Count: " + (comboSum / comboCounts.size())); + //logger.info("Average P Count: " + (partSum / partCounts.size())); + + if (errors.size() > 0) { + logger.warn("ERRORS!!!:"); + for (String e: errors) { + logger.warn(e); + } + } + } catch (Exception e) { + logger.error(e, e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/iterator-test/src/main/java/dss/webservice/itr/test/ConversionTest.java ---------------------------------------------------------------------- diff --git a/partition/iterator-test/src/main/java/dss/webservice/itr/test/ConversionTest.java b/partition/iterator-test/src/main/java/dss/webservice/itr/test/ConversionTest.java new file mode 100644 index 0000000..01abb31 --- /dev/null +++ b/partition/iterator-test/src/main/java/dss/webservice/itr/test/ConversionTest.java @@ -0,0 +1,159 @@ +package dss.webservice.itr.test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import ss.cloudbase.core.iterators.ConversionIterator; +import cloudbase.core.client.BatchScanner; +import cloudbase.core.client.Connector; +import cloudbase.core.data.Key; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.security.Authorizations; +import dss.webservice.itr.Test; + +public class ConversionTest implements Test { + private static final Logger logger = Logger.getLogger(ConversionTest.class); + + String comboIndexTable = "index_v3"; + String type = "hpcp"; + + @Override + public void runTest(Map<String, String> request, Connector connector, String table, String auths) { + if (!request.containsKey("dates")) { + logger.warn("No 'dates' parameter supplied. e.g. dates=20100720,20100721..."); + return; + } + + if (request.containsKey("type")) { + type = request.get("type"); + } + + int threads = 5; + if (request.containsKey("threads")) { + threads = Integer.parseInt(request.remove("threads")); + } + + String[] dates = request.get("dates").split(","); + + List<Long> baseTimes = new ArrayList<Long>(); + List<Long> convertTimes = new ArrayList<Long>(); + List<Long> baseCounts = new ArrayList<Long>(); + List<Long> convertCounts = new ArrayList<Long>(); + List<String> errors = new ArrayList<String>(); + + List<Value> values = new ArrayList<Value>(); + + try { + for (String date: dates) { + long rdate = 99999999 - Long.parseLong(date); + for (int g = 0; g < 8; g++) { + String begin = type + "//rdate:" + rdate + "//geokey:" + g; + String end = type + "//rdate:" + rdate + "//geokey:" + (g+1); + long count = 0; + Set<Range> ranges = new HashSet<Range>(); + + logger.info("Running test for " + begin + " ..."); + // run combo index test + BatchScanner reader = connector.createBatchScanner(table, new Authorizations(auths.split(",")), threads); + ranges.add(new Range(new Key(new Text(begin)), true, new Key(new Text(end)), false)); + + reader.setRanges(ranges); + values.clear(); + long start = System.currentTimeMillis(); + for (Entry<Key, Value> entry: reader) { + values.add(entry.getValue()); + count++; + } + baseTimes.add(System.currentTimeMillis() - start); + baseCounts.add(count); + + logger.info("\tBase count=" + count + " time=" + baseTimes.get(baseTimes.size() - 1) + " ms"); + + count = 0; + for (Value value: values) { + logger.info("\t" + value.toString()); + count++; + if (count == 2) { + break; + } + } + + count = 0; + values.clear(); + + reader = connector.createBatchScanner(table, new Authorizations(auths.split(",")), threads); + ranges.add(new Range(new Key(new Text(begin)), true, new Key(new Text(end)), false)); + + reader.setScanIterators(50, ConversionIterator.class.getName(), "ci"); + reader.setScanIteratorOption("ci", ConversionIterator.OPTION_CONVERSIONS, ConversionIterator.encodeConversions(new String[] { + "frequency / 1000000" + })); + + reader.setRanges(ranges); + values.clear(); + start = System.currentTimeMillis(); + for (Entry<Key, Value> entry: reader) { + values.add(entry.getValue()); + count++; + } + + convertTimes.add(System.currentTimeMillis() - start); + convertCounts.add(count); + + logger.info("\tConvert count=" + count + " time=" + convertTimes.get(convertTimes.size() - 1) + " ms"); + + count = 0; + for (Value value: values) { + logger.info("\t" + value.toString()); + count++; + if (count == 2) { + break; + } + } + } + } + + logger.info("********************* RESULTS *********************"); + logger.info("Tested all 0 level tiles on " + type + " for " + request.get("dates")); + logger.info("This is a test of ConversionIterator performance"); + + double baseSum = 0, convertSum = 0; + for (int i = 0; i < baseTimes.size(); i++) { + baseSum += baseTimes.get(i); + convertSum += convertTimes.get(i); + } + + logger.info("Average Base Time: " + (baseSum / baseTimes.size()) + " ms"); + logger.info("Average Convert Time: " + (convertSum / convertTimes.size()) + " ms"); + + baseSum = 0; + convertSum = 0; + + for (int i = 0; i < baseCounts.size(); i++) { + baseSum += baseCounts.get(i); + convertSum += convertCounts.get(i); + } + + logger.info("Average Base Count: " + (baseSum / baseCounts.size())); + logger.info("Average Convert Count: " + (convertSum / convertCounts.size())); + + if (errors.size() > 0) { + logger.warn("ERRORS!!!:"); + for (String e: errors) { + logger.warn(e); + } + } + } catch (Exception e) { + logger.error(e, e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/iterator-test/test.sh ---------------------------------------------------------------------- diff --git a/partition/iterator-test/test.sh b/partition/iterator-test/test.sh new file mode 100644 index 0000000..5a6cf1f --- /dev/null +++ b/partition/iterator-test/test.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +java -jar target/iterator-test-0.2.0-SNAPSHOT.jar z=localhost t=test $@ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/pom.xml ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/pom.xml b/partition/mr.partition.rdf/pom.xml new file mode 100644 index 0000000..bb1f0b2 --- /dev/null +++ b/partition/mr.partition.rdf/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>mvm.rya</groupId> + <artifactId>parent</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <groupId>mvm.mmrts.rdf</groupId> + <artifactId>mr.partition.rdf</artifactId> + <version>1.0.0-SNAPSHOT</version> + <name>${project.groupId}.${project.artifactId}</name> + + <dependencies> + <dependency> + <groupId>mvm.mmrts.rdf</groupId> + <artifactId>partition.rdf</artifactId> + <version>1.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryparser-sparql</artifactId> + <version>${openrdf.sesame.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>0.20.2</version> + </dependency> + <dependency> + <groupId>org.apache.mrunit</groupId> + <artifactId>mrunit</artifactId> + <version>0.5.0-incubating</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <!-- NOTE: We don't need a groupId specification because the group is + org.apache.maven.plugins ...which is assumed by default. --> + <artifactId>maven-assembly-plugin</artifactId> + <dependencies> + <dependency> + <groupId>mvm.cloud</groupId> + <artifactId>hadoop-job-assembly</artifactId> + <version>1.0.0-SNAPSHOT</version> + </dependency> + </dependencies> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <attach>false</attach> + <descriptors> + <descriptor>assemblies/job.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/assembly/job.xml ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/assembly/job.xml b/partition/mr.partition.rdf/src/main/assembly/job.xml new file mode 100644 index 0000000..259b917 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/assembly/job.xml @@ -0,0 +1,38 @@ +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>job</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <unpack>false</unpack> + <scope>runtime</scope> + <outputDirectory>lib</outputDirectory> + <excludes> + <exclude>org.apache.hadoop:hadoop-core</exclude> + <exclude>${artifact.groupId}:${artifact.artifactId}</exclude> + </excludes> + </dependencySet> + <dependencySet> + <unpack>false</unpack> + <scope>system</scope> + <outputDirectory>lib</outputDirectory> + <excludes> + <exclude>${artifact.groupId}:${artifact.artifactId}</exclude> + </excludes> + </dependencySet> + </dependencySets> + <fileSets> + <fileSet> + <directory>${basedir}/target/classes</directory> + <outputDirectory>/</outputDirectory> + <excludes> + <exclude>*.jar</exclude> + </excludes> + </fileSet> + </fileSets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy b/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy new file mode 100644 index 0000000..e5e02ec --- /dev/null +++ b/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy @@ -0,0 +1,33 @@ +import org.openrdf.rio.rdfxml.* +import org.openrdf.rio.ntriples.NTriplesWriterFactory +import org.openrdf.rio.RDFHandler + +@Grab(group='com.google.guava', module='guava', version='r06') +@Grab(group='org.openrdf.sesame', module='sesame-rio-rdfxml', version='2.3.2') +@Grab(group='org.openrdf.sesame', module='sesame-rio-ntriples', version='2.3.2') +@Grab(group='org.slf4j', module='slf4j-simple', version='1.5.8') +def convertDirRdfFormat(def dir, def outputFile) { + //read each file + assert dir.isDirectory() + + def ntriplesWriter = NTriplesWriterFactory.newInstance().getWriter(new FileOutputStream(outputFile)) + + ntriplesWriter.startRDF() + dir.listFiles().each { it -> + //load file into rdfxml parser + def rdfxmlParser = RDFXMLParserFactory.newInstance().getParser() + rdfxmlParser.setRDFHandler( + [ startRDF: {}, + endRDF: {}, + handleNamespace: { def prefix, def uri -> ntriplesWriter.handleNamespace(prefix, uri)}, + handleComment: {}, + handleStatement: { def stmt -> ntriplesWriter.handleStatement stmt}] as RDFHandler + ) + rdfxmlParser.parse(new FileInputStream(it), "") + } + ntriplesWriter.endRDF() +} + +try{ +convertDirRdfFormat(new File(args[0]), new File(args[1])) +}catch(Exception e) {e.printStackTrace();} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java new file mode 100644 index 0000000..e8b2e5a --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java @@ -0,0 +1,104 @@ +package mvm.mmrts.rdf.partition.mr; + +import com.google.common.io.ByteStreams; +import mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFJob; + +import java.io.FileInputStream; + +/** + * Class MrTstBed + * Date: Sep 1, 2011 + * Time: 9:18:53 AM + */ +public class MrTstBed { + public static void main(String[] args) { + try { +// String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" + +// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" + +// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" + +// "SELECT * WHERE\n" + +// "{\n" + +// "?id tdp:reportedAt ?timestamp. \n" + +// "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314898074000 , 1314898374000 , 'XMLDATETIME')).\n" + +// "?id tdp:performedBy ?system.\n" + +// "?id <http://here/2010/cmv/ns#hasMarkingText> \"U\".\n" + +// "?id rdf:type tdp:Sent.\n" + +// "} \n"; + + FileInputStream fis = new FileInputStream(args[0]); + String query = new String(ByteStreams.toByteArray(fis)); + fis.close(); + +// String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" + +// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" + +// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" + +// "SELECT * WHERE\n" + +// "{\n" + +// "?id tdp:reportedAt ?timestamp.\n" + +// "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314381770000 , 1314381880000 , 'XMLDATETIME')).\n" + +// "?id tdp:performedBy ?system.\n" + +// "}"; + + new SparqlCloudbaseIFJob("partitionRdf", "root", "password", "stratus", "stratus13:2181", "/temp/queryout", MrTstBed.class, query).run(); + +// QueryParser parser = (new SPARQLParserFactory()).getParser(); +// TupleExpr expr = parser.parseQuery(query, "http://www.w3.org/1999/02/22-rdf-syntax-ns#").getTupleExpr(); +// System.out.println(expr); +// +// final Configuration queryConf = new Configuration(); +// expr.visit(new FilterTimeIndexVisitor(queryConf)); +// +// (new SubjectGroupingOptimizer(queryConf)).optimize(expr, null, null); +// +// System.out.println(expr); +// +// //make sure of only one shardlookup +// expr.visit(new QueryModelVisitorBase<RuntimeException>() { +// int count = 0; +// +// @Override +// public void meetOther(QueryModelNode node) throws RuntimeException { +// super.meetOther(node); +// count++; +// if (count > 1) +// throw new IllegalArgumentException("Query can only have one subject-star lookup"); +// } +// }); +// +// final Job job = new Job(queryConf); +// job.setJarByClass(MrTstBed.class); +// +// expr.visit(new QueryModelVisitorBase<RuntimeException>() { +// @Override +// public void meetOther(QueryModelNode node) throws RuntimeException { +// super.meetOther(node); +// +// //set up CloudbaseBatchScannerInputFormat here +// if (node instanceof ShardSubjectLookup) { +// System.out.println("Lookup: " + node); +// try { +// new SparqlCloudbaseIFTransformer((ShardSubjectLookup) node, queryConf, job, "partitionRdf", +// "root", "password", "stratus", "stratus13:2181"); +// } catch (QueryEvaluationException e) { +// e.printStackTrace(); +// } +// } +// } +// }); +// +// Path outputDir = new Path("/temp/sparql-out/testout"); +// FileSystem dfs = FileSystem.get(outputDir.toUri(), queryConf); +// if (dfs.exists(outputDir)) +// dfs.delete(outputDir, true); +// +// FileOutputFormat.setOutputPath(job, outputDir); +// +// // Submit the job +// Date startTime = new Date(); +// System.out.println("Job started: " + startTime); +// job.waitForCompletion(true); + } catch (Exception e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java new file mode 100644 index 0000000..15c9c79 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java @@ -0,0 +1,411 @@ +package mvm.mmrts.rdf.partition.mr; + +import cloudbase.core.client.ZooKeeperInstance; +import cloudbase.core.util.ArgumentChecker; +import mvm.mmrts.rdf.partition.PartitionSail; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.*; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.*; +import org.openrdf.repository.Repository; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; + +/** + * Class SparqlPartitionStoreInputFormat + * Date: Oct 28, 2010 + * Time: 11:48:17 AM + */ +public class SparqlPartitionStoreInputFormat extends InputFormat<LongWritable, MapWritable> { + + public static final String PREFIX = "mvm.mmrts.rdf.partition.mr.sparqlinputformat"; + public static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured"; + public static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured"; + public static final String USERNAME = PREFIX + ".username"; + public static final String PASSWORD = PREFIX + ".password"; + + public static final String INSTANCE_NAME = PREFIX + ".instanceName"; + public static final String ZK = PREFIX + ".zk"; + + public static final String STARTTIME = PREFIX + ".starttime"; + public static final String ENDTIME = PREFIX + ".endtime"; + public static final String TABLE = PREFIX + ".table"; + public static final String SHARD_TABLE = PREFIX + ".shardtable"; + public static final String SPARQL_QUERIES_PROP = PREFIX + ".sparql"; + public static final String MR_NUMTHREADS_PROP = PREFIX + ".numthreads"; +// public static final String RANGE_PROP = PREFIX + ".range"; +// public static final String NUM_RANGES_PROP = PREFIX + ".numranges"; +// public static final String TABLE_PREFIX_PROP = PREFIX + ".tablePrefix"; +// public static final String OFFSET_RANGE_PROP = PREFIX + ".offsetrange"; + +// public static final String INFER_PROP = PREFIX + ".infer"; + + private static final String UTF_8 = "UTF-8"; + + private static final ValueFactory vf = ValueFactoryImpl.getInstance(); + + static class SparqlInputSplit extends InputSplit implements Writable { + + protected String sparql; + protected String startTime; + protected String endTime; + protected String table; +// private Long offset; +// private Long limit; + + private SparqlInputSplit() { + } + + private SparqlInputSplit(String sparql, String startTime, String endTime, String table) { + this.sparql = sparql; + this.startTime = startTime; + this.endTime = endTime; + this.table = table; +// this.offset = offset; +// this.limit = limit; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[]{sparql}; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + boolean startTimeExists = startTime != null; + dataOutput.writeBoolean(startTimeExists); + if (startTimeExists) + dataOutput.writeUTF(startTime); + + boolean endTimeExists = endTime != null; + dataOutput.writeBoolean(endTimeExists); + if (endTimeExists) + dataOutput.writeUTF(endTime); + + dataOutput.writeUTF(table); + dataOutput.writeUTF(sparql); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + if (dataInput.readBoolean()) + this.startTime = dataInput.readUTF(); + if (dataInput.readBoolean()) + this.endTime = dataInput.readUTF(); + this.table = dataInput.readUTF(); + this.sparql = dataInput.readUTF(); + } + } + + /** + * Create a SparqlInputSplit for every sparql query.<br> + * Separate a single sparql query into numRanges of time ranges. For example, + * a numRange of 3, with range of 1 day (ms), and 1 query, will have 3 input splits + * with the same query, however the first range will go from now to a day before, the second + * will go from the day before to the day before that, the third will go from the two days + * ago to forever back. + * <br><br> + * If the numRanges is not set, or set to 1, the inputsplit can only focus on a certain startTime, + * ttl. If these are not set, then look at all time. + * + * @param job + * @return + * @throws java.io.IOException + * @throws InterruptedException + */ + @Override + public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { + validateOptions(job.getConfiguration()); + final Collection<String> queries = getSparqlQueries(job.getConfiguration()); + if (queries == null || queries.size() == 0) + throw new IOException("Queries cannot be null or empty"); + + String startTime_s = getStartTime(job.getConfiguration()); + String endTime_s = getEndTime(job.getConfiguration()); + + List<InputSplit> splits = new ArrayList<InputSplit>(); + for (String query : queries) { + splits.add(new SparqlInputSplit(query, startTime_s, endTime_s, getTable(job.getConfiguration()))); + } + return splits; + } + + @Override + public RecordReader<LongWritable, MapWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + return new SparqlResultsRecordReader(taskAttemptContext.getConfiguration()); + } + + protected static String getUsername(Configuration conf) { + return conf.get(USERNAME); + } + + /** + * WARNING: The password is stored in the Configuration and shared with all + * MapReduce tasks; It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + */ + protected static String getPassword(Configuration conf) { + return new String(Base64.decodeBase64(conf.get(PASSWORD, "").getBytes())); + } + + protected static String getInstance(Configuration conf) { + return conf.get(INSTANCE_NAME); + } + + public static void setSparqlQueries(JobContext job, String... queries) { + if (queries == null || queries.length == 0) + throw new IllegalArgumentException("Queries cannot be null or empty"); + + final Configuration conf = job.getConfiguration(); + setSparqlQueries(conf, queries); + } + + public static void setSparqlQueries(Configuration conf, String... queries) { + try { + Collection<String> qencs = new ArrayList<String>(); + for (String query : queries) { + final String qenc = URLEncoder.encode(query, UTF_8); + qencs.add(qenc); + } + conf.setStrings(SPARQL_QUERIES_PROP, qencs.toArray(new String[qencs.size()])); + } catch (UnsupportedEncodingException e) { + //what to do... + e.printStackTrace(); + } + } + + public static Collection<String> getSparqlQueries(Configuration conf) { + Collection<String> queries = new ArrayList<String>(); + final Collection<String> qencs = conf.getStringCollection(SPARQL_QUERIES_PROP); + for (String qenc : qencs) { + queries.add(qenc); + } + return queries; + } + + public static void setLongJob(JobContext job, Long time) { + Configuration conf = job.getConfiguration(); + //need to make the runtime longer, default 30 min + time = (time == null) ? 1800000 : time; + conf.setLong("mapreduce.tasktracker.healthchecker.script.timeout", time); + conf.set("mapred.child.java.opts", "-Xmx1G"); + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + } + + public static void setInputInfo(JobContext job, String user, byte[] passwd) { + Configuration conf = job.getConfiguration(); + if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false)) + throw new IllegalStateException("Input info can only be set once per job"); + conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true); + + ArgumentChecker.notNull(user, passwd); + conf.set(USERNAME, user); + conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); + } + + public static void setEndTime(JobContext job, String endTime) { + Configuration conf = job.getConfiguration(); + conf.set(ENDTIME, endTime); + } + + public static String getEndTime(Configuration conf) { + return conf.get(ENDTIME); + } + + public static void setNumThreads(JobContext job, int numThreads) { + Configuration conf = job.getConfiguration(); + conf.setInt(MR_NUMTHREADS_PROP, numThreads); + } + + public static int getNumThreads(Configuration conf) { + return conf.getInt(MR_NUMTHREADS_PROP, -1); + } + + public static void setTable(JobContext job, String table) { + Configuration conf = job.getConfiguration(); + conf.set(TABLE, table); + } + + public static String getTable(Configuration conf) { + return conf.get(TABLE); + } + + public static void setShardTable(JobContext job, String table) { + Configuration conf = job.getConfiguration(); + conf.set(SHARD_TABLE, table); + } + + public static String getShardTable(Configuration conf) { + String t = conf.get(SHARD_TABLE); + return (t != null) ? t : getTable(conf); + } + + public static void setStartTime(JobContext job, String startTime) { + Configuration conf = job.getConfiguration(); + conf.set(STARTTIME, startTime); + } + + public static String getStartTime(Configuration conf) { + return conf.get(STARTTIME); + } + + public static void setZooKeeperInstance(JobContext job, String instanceName, String zk) { + Configuration conf = job.getConfiguration(); + if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) + throw new IllegalStateException("Instance info can only be set once per job"); + conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); + + ArgumentChecker.notNull(instanceName, zk); + conf.set(INSTANCE_NAME, instanceName); + conf.set(ZK, zk); + } + + protected static void validateOptions(Configuration conf) throws IOException { + if (!conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false)) + throw new IOException("Input info has not been set."); + if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) + throw new IOException("Instance info has not been set."); + if (conf.getStrings(SPARQL_QUERIES_PROP) == null) + throw new IOException("Sparql queries have not been set."); + } + + private class SparqlResultsRecordReader extends RecordReader<LongWritable, MapWritable> +// implements TupleQueryResultWriter, Runnable + { + + boolean closed = false; + long count = 0; + BlockingQueue<MapWritable> queue = new LinkedBlockingQueue<MapWritable>(); + private Repository repo; + String query; + + Configuration conf; + private TupleQueryResult result; + private RepositoryConnection conn; + + public SparqlResultsRecordReader(Configuration conf) { + this.conf = conf; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + + try { + validateOptions(conf); + + SparqlInputSplit sis = (SparqlInputSplit) inputSplit; + this.query = sis.sparql; + + // init RdfCloudTripleStore + final PartitionSail store = new PartitionSail(new ZooKeeperInstance(getInstance(conf), + conf.get(ZK)).getConnector(getUsername(conf), getPassword(conf).getBytes()), getTable(conf), getShardTable(conf)); + + repo = new SailRepository(store); + repo.initialize(); + + conn = repo.getConnection(); + query = URLDecoder.decode(query, UTF_8); + TupleQuery tupleQuery = conn.prepareTupleQuery( + QueryLanguage.SPARQL, query); + + if (sis.startTime != null && sis.endTime != null) { + tupleQuery.setBinding(START_BINDING, vf.createLiteral(sis.startTime)); + tupleQuery.setBinding(END_BINDING, vf.createLiteral(sis.endTime)); + } + + int threads = getNumThreads(conf); + if (threads > 0) { + tupleQuery.setBinding(NUMTHREADS_PROP, vf.createLiteral(threads)); + } + + result = tupleQuery.evaluate(); + } catch (Exception e) { + throw new IOException("Exception occurred opening Repository", e); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + try { + return result.hasNext(); + } catch (QueryEvaluationException e) { + throw new IOException(e); + } +// return false; + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return new LongWritable(count++); + } + + @Override + public MapWritable getCurrentValue() throws IOException, InterruptedException { + try { + if (result.hasNext()) { + BindingSet bindingSet = result.next(); + return transformRow(bindingSet); + } + return null; + } catch (QueryEvaluationException e) { + throw new IOException(e); + } + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return (closed) ? (1) : (0); + } + + @Override + public void close() throws IOException { + closed = true; + try { + conn.close(); + repo.shutDown(); + } catch (RepositoryException e) { + throw new IOException("Exception occurred closing Repository", e); + } + } + + MapWritable mw = new MapWritable(); + + protected MapWritable transformRow(BindingSet bindingSet) { + mw.clear(); //handle the case of optional bindings. -mbraun + for (String name : bindingSet.getBindingNames()) { + final Text key = new Text(name); + final Text value = new Text(bindingSet.getValue(name).stringValue()); + mw.put(key, value); + } + return mw; + } + } +}
