Hi Arun, Did you ever find a solution to this one? Probably a good place to start is the logs from your MR job while the slowdown is happening.
-n On Sun, Dec 6, 2015 at 11:45 PM, Thangamani, Arun <[email protected]> wrote: > Hello, I noticed an issue with bulk insert through map reduce in phoenix > 4.4.0.2.3.0.0-2557, using outline of the code below > > Normally the inserts of about 25 million rows complete in about 5 mins, I > have 5 region servers and the phoenix table has 32 buckets > But sometimes (maybe after major compactions or region movement?), writes > simply slow down to 90 mins, when I truncate SYSTEM.STATS hbase table, the > inserts get a little faster (60 mins), but when I truncate both > SYSTEM.CATALOG & SYSTEM.STATS tables, and recreate the phoenix table def(s) > the inserts go back to 5 mins, the workaround of truncating SYSTEM tables > is not sustainable for long, can someone help and let me know if there is a > patch available for this? Thanks in advance for the help. > > Job job = Job.getInstance(conf, NAME); > // Set the target Phoenix table and the columns > PhoenixMapReduceUtil.setOutput(job, tableName, > "WEB_ID,WEB_PAGE_LABEL,DEVICE_TYPE," + > > "WIDGET_INSTANCE_ID,WIDGET_TYPE,WIDGET_VERSION,WIDGET_CONTEXT," + > > "TOTAL_CLICKS,TOTAL_CLICK_VIEWS,TOTAL_HOVER_TIME_MS,TOTAL_TIME_ON_PAGE_MS,TOTAL_VIEWABLE_TIME_MS," > + > > "VIEW_COUNT,USER_SEGMENT,DIM_DATE_KEY,VIEW_DATE,VIEW_DATE_TIMESTAMP,ROW_NUMBER"); > FileInputFormat.setInputPaths(job, inputPath); > job.setMapperClass(WidgetPhoenixMapper.class); > job.setMapOutputKeyClass(NullWritable.class); > job.setMapOutputValueClass(WidgetPagesStatsWritable.class); > job.setOutputFormatClass(PhoenixOutputFormat.class); > TableMapReduceUtil.addDependencyJars(job); > job.setNumReduceTasks(0); > job.waitForCompletion(true); > > public static class WidgetPhoenixMapper extends Mapper<LongWritable, Text, > NullWritable, WidgetPagesStatsWritable> { > @Override > public void map(LongWritable longWritable, Text text, Context context) > throws IOException, InterruptedException { > Configuration conf = context.getConfiguration(); > String rundateString = conf.get("rundate"); > PagesSegmentWidgetLineParser parser = new > PagesSegmentWidgetLineParser(); > try { > PagesSegmentWidget pagesSegmentWidget = > parser.parse(text.toString()); > > if (pagesSegmentWidget != null) { > WidgetPagesStatsWritable widgetPagesStatsWritable = new > WidgetPagesStatsWritable(); > WidgetPagesStats widgetPagesStats = new WidgetPagesStats(); > > widgetPagesStats.setWebId(pagesSegmentWidget.getWebId()); > > widgetPagesStats.setWebPageLabel(pagesSegmentWidget.getWebPageLabel()); > > widgetPagesStats.setWidgetInstanceId(pagesSegmentWidget.getWidgetInstanceId()); > ….. > > > widgetPagesStatsWritable.setWidgetPagesStats(widgetPagesStats); > context.write(NullWritable.get(), > widgetPagesStatsWritable); > } > > }catch (Exception e){ > e.printStackTrace(); > } > } > } > > public final class WidgetPagesStats { > private String webId; > private String webPageLabel; > private long widgetInstanceId; > private String widgetType; > > … > @Override > public boolean equals(Object o) { > > .. > } > @Override > public int hashCode() { > > .. > } > @Override > public String toString() { > return "WidgetPhoenix{“…. > '}'; > } > } > > public class WidgetPagesStatsWritable implements DBWritable, Writable { > > private WidgetPagesStats widgetPagesStats; > > public void readFields(DataInput input) throws IOException { > widgetPagesStats.setWebId(input.readLine()); > widgetPagesStats.setWebPageLabel(input.readLine()); > widgetPagesStats.setWidgetInstanceId(input.readLong()); > widgetPagesStats.setWidgetType(input.readLine()); > > … > } > > public void write(DataOutput output) throws IOException { > output.writeBytes(widgetPagesStats.getWebId()); > output.writeBytes(widgetPagesStats.getWebPageLabel()); > > output.writeLong(widgetPagesStats.getWidgetInstanceId()); > output.writeBytes(widgetPagesStats.getWidgetType()); > > .. > } > > public void readFields(ResultSet rs) throws SQLException { > widgetPagesStats.setWebId(rs.getString("WEB_ID")); > widgetPagesStats.setWebPageLabel(rs.getString("WEB_PAGE_LABEL")); > > widgetPagesStats.setWidgetInstanceId(rs.getLong("WIDGET_INSTANCE_ID")); > widgetPagesStats.setWidgetType(rs.getString("WIDGET_TYPE")); > > … > } > > public void write(PreparedStatement pstmt) throws SQLException { > Connection connection = pstmt.getConnection(); > PhoenixConnection phoenixConnection = (PhoenixConnection) > connection; > //connection.getClientInfo().setProperty("scn", > Long.toString(widgetPhoenix.getViewDateTimestamp())); > > pstmt.setString(1, widgetPagesStats.getWebId()); > pstmt.setString(2, widgetPagesStats.getWebPageLabel()); > pstmt.setString(3, widgetPagesStats.getDeviceType()); > > pstmt.setLong(4, widgetPagesStats.getWidgetInstanceId()); > > … > } > > public WidgetPagesStats getWidgetPagesStats() { > return widgetPagesStats; > } > > public void setWidgetPagesStats(WidgetPagesStats widgetPagesStats) { > this.widgetPagesStats = widgetPagesStats; > } > } > > > ---------------------------------------------------------------------- > This message and any attachments are intended only for the use of the > addressee and may contain information that is privileged and confidential. > If the reader of the message is not the intended recipient or an authorized > representative of the intended recipient, you are hereby notified that any > dissemination of this communication is strictly prohibited. If you have > received this communication in error, notify the sender immediately by > return email and delete the message and any attachments from your system. >
