Hi Yan,

Parquet's writer enforces a limit per file of 3x the row group size for the tracked allocated memory, per file. There may be some per-column overhead not tracked there, but I think it's reasonable to say that with a 3GB heap, you probably aren't running out of memory because of the number of columns.

What we usually see causing out of memory issues is writing to multiple files at the same time. For example, if you're writing to a partitioned dataset you might be writing records to 10 partitions. If you have 10 files open at once, then a 256MB row group for each file requires 2.5GB total. The solution is to write only one Parquet file (partition) at a time and close it before you move to the next file. This usually requires an extra reduce round in your workflow to shuffle the records so they can be written this way.

What are you using to write the data? That will determine how you add the shuffle I'm talking about.

rb

On 01/07/2016 11:38 AM, Yan Qi wrote:
Sure, it is possible to change the size of row group and others. Right now
we are setting parquet-block-size to be 256M, page-size to be 1M, and
giving ~3G for Xmx.

Though my question is not directly about the sizes, as conceptually we can
always solve the problem by giving larger memory. I am trying to figure out
the right WAY to define the schema, because we have a limitation to the
memory (<5G) for JVM and too small parquet block size can compromise the
columnar storage benefits. Also it is possible for us to add more 'MARKET's
in future, making the number of table columns even larger. Therefore we
need to get some concrete ideas of the memory consumption by Parquet itself
(e.g., Parquet requires an internal structure to keep the table schema, I
suppose).

Any suggestions?

Thanks,
Yan

On Thu, Jan 7, 2016 at 11:14 AM, Reuben Kuhnert <[email protected]
wrote:

Hi again Yan,

Sorry about the late reply, the *ParquetOutputFormat* class has a number of
setters:

   public static void setBlockSize(Job job, int blockSize) {
     getConfiguration(job).setInt(BLOCK_SIZE, blockSize);
   }

   public static void setPageSize(Job job, int pageSize) {
     getConfiguration(job).setInt(PAGE_SIZE, pageSize);
   }

   public static void setDictionaryPageSize(Job job, int pageSize) {
     getConfiguration(job).setInt(DICTIONARY_PAGE_SIZE, pageSize);
   }

   public static void setCompression(Job job, CompressionCodecName
compression) {
     getConfiguration(job).set(COMPRESSION, compression.name());
   }

   public static void setEnableDictionary(Job job, boolean enableDictionary)
{
     getConfiguration(job).setBoolean(ENABLE_DICTIONARY, enableDictionary);
   }

these allow you to set the 'row group' (i.e. block size) and page size
which determine how much data is written out per block (and transitively
how much data is retained in memory before a flush). Try setting these to
say '128 M' for a block and '1 MB' for a page (to test). If this doesn't
work can you let us know what the current sizes are that you're using
(using the associated getters also on ParquetOutputFormat)?

Thanks

On Wed, Jan 6, 2016 at 4:23 PM, Yan Qi <[email protected]> wrote:

Hi Reuben,

Thanks for your quick reply! :)

The table has nested columns with the following Avro schema:

{
"namespace": "profile.avro.parquet.model",
"type": "record",
"name": "Profile",
"fields": [
{"name": "id", "type": "int"},
{"name": "M1", "type": ["Market", "null"]},
{"name": "M2", "type": ["Market", "null"]},
                 .......
                 .......
{"name": "M100", "type": ["Market", "null"]}
]
}

{
"namespace": "profile.avro.parquet.model",
"type": "record",
"name": "Market",
"fields": [
{"name": "item1", "type": [{ "type": "array", "items": "Client"},
"null"]},
{"name": "item2", "type": [{ "type": "array", "items": "Client"},
"null"]},
{"name": "item3", "type": [{ "type": "array", "items": "Client"},
"null"]},
{"name": "item4", "type": [{ "type": "array", "items": "Client"},
"null"]},
{"name": "item5", "type": [{ "type": "array", "items": "Client"},
"null"]}
]
}

{
"namespace": "profile.avro.parquet.model",
"type": "record",
"name": "Client",
"fields": [
                 {"name": "attribute1", "type": "int"},
                 {"name": "attribute2", "type": "int"},
                 {"name": "attribute3", "type": "int"},
                 ......
                 ......
                 {"name": "attribute50", "type": "int"}
]
}

For each record in the table, it may not have every attribute valid. For
example, a record of Profile may only have M1, M20 and M89 with values,
but
other empty. When we tried to write such a record in the parquet format,
it
requires a lot of memory to get started.

We also tried another way to define the table, like:

{
"namespace": "profile.avro.parquet.model",
"type": "record",
"name": "Profile",
"fields": [
{"name": "id", "type": "int"},
                 {"name": "markets", "type": [{ "type": "array", "items":
"Market"}, "null"]},
]
}

Interestingly it can handle the same data with much smaller memory. But
we
won't be able to get the columnar storage benefits for those Market
members
because we have to load data from all markets no matter what market is
concerned.

Hope my information could give you a rough idea of the application. So my
question is if increasing the memory size is the only way in the former
case, or if there is a better way to define the table.

Best regards,

Yan



On Wed, Jan 6, 2016 at 12:03 PM, Reuben Kuhnert <
[email protected]
wrote:

Hi Yan,

So the primary concern here would be the 'row group' size that you're
using
for your table. The row group is basically what determines how much
information is stored in memory before being flushed to disk (this
becomes
an even greater issue if you have multiple parquet files open
simultaneously as well - obviously). If you could, can you share some
of
the stats about your file with us? See if we can't get you moving
again.

Thanks
Reuben

On Wed, Jan 6, 2016 at 1:54 PM, Yan Qi <[email protected]> wrote:

We are trying to create a large table in Parquet. The table has up to
thousands of columns, but its record may not be large because many of
the
columns are empty. We are using Avro-Parquet for data
serialization/de-serialization. However, we got out-of-memory issue
when
writing the data in the Parquet format.

Our understanding is that Parquet may keep an internal structure for
the
table schema, which may take more memory if the table becomes larger.
If
that's the case, our question is:

Is there a limit to the table size that Parquet can support? If yes,
how
could we determine the limit?

Thanks,
Yan







--
Ryan Blue
Software Engineer
Cloudera, Inc.

Reply via email to